diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0beec62..9174ea9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -769,6 +769,10 @@ public static boolean isAclEnabled(Configuration conf) { + "container-queuing-enabled"; public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false; + /** Class to be used for opportunistic container management on the NM.*/ + public static final String NM_CONTAINER_OPPORTUNISTIC_CONTAINER_MANAGER = + NM_PREFIX + "opportunistic-container-manager.class"; + /** Environment variables that will be sent to containers.*/ public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env"; public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManager.java new file mode 100644 index 0000000..2b59ce7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManager.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Interface to manage the queuing of opportunistic containers. It allows to + * abstract out preemption behavior and other details. + */ +public abstract class OpportunisticContainerManager { + + /** + * Add a container to the queue + * + * @param containerInfo the container to queue + */ + public abstract void queueContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo); + + /** + * Preempt the given container + * + * @param containerInfo the container to preempt + * @throws YarnException + * @throws IOException + */ + public abstract void preemptContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo) + throws YarnException, IOException; + + /** + * Start the given container + * + * @param containerInfo the container to run + * @throws YarnException + * @throws IOException + */ + public abstract void startContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo) + throws YarnException, IOException; + + /** + * Returns true if there are any container to run + * + * @return whether there are any containers waiting to be run + */ + public abstract boolean isQueueEmpty(); + + /** + * Get the containers that are queued (containers that have not + * run) + * + * @return an iterator + */ + public abstract Iterator + getQueuedContainers(); + + /** + * Get the containers that are ready to be run (containers that have not + * run or were preempted and can be resumed) + * + * @return an iterator + */ + public abstract Iterator + getContainersToRun(); + + /** + * Remove the container from the queue + * + * @return true if the container was removed and fale otherwise + */ + public abstract boolean removeContainer(ContainerId containerId); + + /** + * Initialize the manager + */ + public abstract void initialize(QueuingContainerManagerImpl containerManager); + + /** + * Returns the number of queued containers + * + * @return count of queued containers + */ + public abstract int getQueuedContainersCount(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManagerImpl.java new file mode 100644 index 0000000..e168965 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/OpportunisticContainerManagerImpl.java @@ -0,0 +1,134 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Default implementation for OpportunisticContainerManager. + */ +public class OpportunisticContainerManagerImpl extends + OpportunisticContainerManager { + + // Queue for maintaining the queued opportunistic containers + private Queue + queuedContainers; + + // QueuingContainerManager reference + private QueuingContainerManagerImpl queuingContainerManager; + + /** + * C'tor + */ + public OpportunisticContainerManagerImpl() { + this.queuedContainers = new ConcurrentLinkedQueue<>(); + } + + /** + * Initialize the manager + */ + @Override + public void initialize(QueuingContainerManagerImpl containerManager){ + this.queuingContainerManager = containerManager; + } + + /** + * Add a container to the queue + * @param containerInfo the container to queue + */ + @Override + public void queueContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo) { + this.queuedContainers.add(containerInfo); + } + + /** + * Preempt the given container + * @param containerInfo the container to preempt + * @throws YarnException + * @throws IOException + */ + @Override + public void preemptContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo) + throws YarnException, IOException { + this.queuingContainerManager.stopContainerInternal( + containerInfo.getPti().getContainerId()); + } + + /** + * Start the given container + * @param containerInfo the container to run + * @throws YarnException + * @throws IOException + */ + @Override public void startContainer( + QueuingContainerManagerImpl.AllocatedContainerInfo containerInfo) + throws YarnException, IOException { + this.queuingContainerManager.startAllocatedContainerInternal(containerInfo); + } + + /** + * Get the containers that are queued (containers that have not + * run) + * @return an iterator + */ + @Override + public Iterator + getQueuedContainers() { + return this.queuedContainers.iterator(); + } + + /** + * Get the containers that are ready to be run (containers that have not + * run or were preempted and can be resumed) + * @return an iterator + */ + @Override + public Iterator + getContainersToRun() { + return this.queuedContainers.iterator(); + } + + /** + * Remove the container from the queue + * @return true if the container was removed and fale otherwise + */ + @Override public boolean removeContainer(ContainerId containerId) { + boolean foundInQueue = false; + Iterator iter = + this.queuedContainers.iterator(); + while (iter.hasNext() && !foundInQueue) { + if (iter.next().getPti().getContainerId().equals(containerId)) { + iter.remove(); + foundInQueue = true; + } + } + return foundInQueue; + } + + /** + * Returns true if there are any container to run + * @return whether there are any containers waiting to be run + */ + @Override + public boolean isQueueEmpty() { + return this.queuedContainers.isEmpty(); + } + + /** + * Returns the number of queued containers + * + * @return count of queued containers + */ + @Override + public int getQueuedContainersCount(){ + return this.queuedContainers.size(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 38b1b07..86c060c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -81,7 +82,7 @@ allocatedOpportunisticContainers; private Queue queuedGuaranteedContainers; - private Queue queuedOpportunisticContainers; + private OpportunisticContainerManager opportunisticContainerManager; private Set opportunisticContainersToKill; private final ContainerQueuingLimit queuingLimit; @@ -94,13 +95,23 @@ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, this.allocatedGuaranteedContainers = new ConcurrentHashMap<>(); this.allocatedOpportunisticContainers = new ConcurrentHashMap<>(); this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>(); - this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet()); this.queuingLimit = ContainerQueuingLimit.newInstance(); } @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + opportunisticContainerManager = ReflectionUtils.newInstance( + conf.getClass( + YarnConfiguration.NM_CONTAINER_OPPORTUNISTIC_CONTAINER_MANAGER, + OpportunisticContainerManagerImpl.class, + OpportunisticContainerManager.class), conf); + opportunisticContainerManager.initialize(this); + } + + @Override protected EventHandler createApplicationEventDispatcher() { return new QueuingApplicationEventDispatcher( super.createApplicationEventDispatcher()); @@ -122,7 +133,7 @@ protected void startContainerInternal( // there are no queued containers waiting to be executed, start this // container immediately. if (queuedGuaranteedContainers.isEmpty() && - queuedOpportunisticContainers.isEmpty() && + opportunisticContainerManager.isQueueEmpty() && getContainersMonitor(). hasResourcesAvailable(allocatedContInfo.getPti())) { startAllocatedContainer(allocatedContInfo); @@ -140,7 +151,8 @@ protected void startContainerInternal( } else { LOG.info("Opportunistic container {} will be queued at the NM.", cIdToStart); - queuedOpportunisticContainers.add(allocatedContInfo); + opportunisticContainerManager.queueContainer( + allocatedContInfo); } } } @@ -181,6 +193,17 @@ protected void stopContainerInternal(ContainerId containerID) } /** + * Start an allocated container + * @param allocatedContainerInfo + */ + protected void startAllocatedContainerInternal(AllocatedContainerInfo + allocatedContainerInfo) throws IOException, YarnException { + super.startContainerInternal( + allocatedContainerInfo.getContainerTokenIdentifier(), + allocatedContainerInfo.getStartRequest()); + } + + /** * Start the execution of the given container. Also add it to the allocated * containers, and update allocated resource utilization. */ @@ -205,9 +228,13 @@ private void startAllocatedContainer( this.context.getQueuingContext().getQueuedContainers().remove(containerId); try { LOG.info("Starting container [" + containerId + "]"); - super.startContainerInternal( - allocatedContainerInfo.getContainerTokenIdentifier(), - allocatedContainerInfo.getStartRequest()); + if(allocatedContainerInfo.getExecutionType() == + ExecutionType.GUARANTEED){ + startAllocatedContainerInternal(allocatedContainerInfo); + } + else { + opportunisticContainerManager.startContainer(allocatedContainerInfo); + } } catch (YarnException | IOException e) { containerFailedToStart(pti.getContainerId(), allocatedContainerInfo.getContainerTokenIdentifier()); @@ -233,19 +260,22 @@ private void containerFailedToStart(ContainerId containerId, */ private boolean removeQueuedContainer(ContainerId containerId, ExecutionType executionType) { - Queue queue = - (executionType == ExecutionType.GUARANTEED) ? - queuedGuaranteedContainers : queuedOpportunisticContainers; boolean foundInQueue = false; - Iterator iter = queue.iterator(); - while (iter.hasNext() && !foundInQueue) { - if (iter.next().getPti().getContainerId().equals(containerId)) { - iter.remove(); - foundInQueue = true; + if(executionType == ExecutionType.GUARANTEED) { + Iterator iter = + queuedGuaranteedContainers.iterator(); + + while (iter.hasNext() && !foundInQueue) { + if (iter.next().getPti().getContainerId().equals(containerId)) { + iter.remove(); + foundInQueue = true; + } } } - + else if(executionType == ExecutionType.OPPORTUNISTIC) { + foundInQueue = opportunisticContainerManager.removeContainer(containerId); + } return foundInQueue; } @@ -292,13 +322,18 @@ private void killOpportunisticContainers( AllocatedContainerInfo allocatedContInfo) { ContainerId containerToStartId = allocatedContInfo.getPti() .getContainerId(); - List extraOpportContainersToKill = + List extraOpportContainersToKill = pickOpportunisticContainersToKill(containerToStartId); + ContainerId contIdToKill; // Kill the opportunistic containers that were chosen. - for (ContainerId contIdToKill : extraOpportContainersToKill) { + for (AllocatedContainerInfo contInfoToKill : extraOpportContainersToKill) { + contIdToKill = contInfoToKill.getPti().getContainerId(); try { - stopContainerInternalIfRunning(contIdToKill); + // If the container is running then preempt it + if(this.context.getContainers().containsKey(contIdToKill)){ + this.opportunisticContainerManager.preemptContainer(contInfoToKill); + } } catch (YarnException | IOException e) { LOG.error("Container did not get removed successfully.", e); } @@ -318,11 +353,12 @@ private void killOpportunisticContainers( * resources occupied by opportunistic containers. * @return the additional opportunistic containers that need to be killed. */ - protected List pickOpportunisticContainersToKill( + protected List pickOpportunisticContainersToKill( ContainerId containerToStartId) { // The additional opportunistic containers that need to be killed for the // given container to start. - List extraOpportContainersToKill = new ArrayList<>(); + List extraOpportContainersToKill = + new ArrayList<>(); // Track resources that need to be freed. ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( containerToStartId); @@ -344,7 +380,7 @@ private void killOpportunisticContainers( } if (!opportunisticContainersToKill.contains(runningOpportContId)) { - extraOpportContainersToKill.add(runningOpportContId); + extraOpportContainersToKill.add(runningOpportCont.getValue()); opportunisticContainersToKill.add(runningOpportContId); getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp, runningOpportCont.getValue().getPti()); @@ -415,26 +451,26 @@ private ResourceUtilization resourcesToFreeUp( private void startPendingContainers() { // Start pending guaranteed containers, if resources available. boolean resourcesAvailable = - startContainersFromQueue(queuedGuaranteedContainers); + startContainersFromQueue(queuedGuaranteedContainers.iterator()); // Start opportunistic containers, if resources available. if (resourcesAvailable) { - startContainersFromQueue(queuedOpportunisticContainers); + startContainersFromQueue(opportunisticContainerManager. + getContainersToRun()); } } private boolean startContainersFromQueue( - Queue queuedContainers) { - Iterator guarIter = queuedContainers.iterator(); + Iterator iter) { boolean resourcesAvailable = true; - while (guarIter.hasNext() && resourcesAvailable) { - AllocatedContainerInfo allocatedContInfo = guarIter.next(); + while (iter.hasNext() && resourcesAvailable) { + AllocatedContainerInfo allocatedContInfo = iter.next(); if (getContainersMonitor().hasResourcesAvailable( allocatedContInfo.getPti())) { startAllocatedContainer(allocatedContInfo); - guarIter.remove(); + iter.remove(); } else { resourcesAvailable = false; } @@ -499,7 +535,7 @@ protected void recoverActiveContainer( // guaranteed container. killOpportunisticContainers(allocatedContInfo); } else { - queuedOpportunisticContainers.add(allocatedContInfo); + opportunisticContainerManager.queueContainer(allocatedContInfo); } } else { super.recoverActiveContainer(launchContext, token, rcs); @@ -523,7 +559,7 @@ public int getNumQueuedGuaranteedContainers() { @VisibleForTesting public int getNumQueuedOpportunisticContainers() { - return queuedOpportunisticContainers.size(); + return this.opportunisticContainerManager.getQueuedContainersCount(); } class QueuingApplicationEventDispatcher implements @@ -567,7 +603,7 @@ public void updateQueuingLimit(ContainerQueuingLimit limit) { private void shedQueuedOpportunisticContainers() { int numAllowed = this.queuingLimit.getMaxQueueLength(); Iterator containerIter = - queuedOpportunisticContainers.iterator(); + opportunisticContainerManager.getQueuedContainers(); while (containerIter.hasNext()) { AllocatedContainerInfo cInfo = containerIter.next(); if (numAllowed <= 0) {