diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index eaa249f..c0cefe1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2618,6 +2618,13 @@ public static boolean isAclEnabled(Configuration conf) { "org.apache.hadoop.yarn.server.timelineservice.storage" + ".HBaseTimelineReaderImpl"; + public static final String NM_CONTAINER_SCHEDULER_CLASS = + NM_PREFIX + "container-scheduler.class"; + + public static final String DEFAULT_NM_CONTAINER_SCHEDULER_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.containermanager." + + "scheduler.ContainerScheduler"; + /** * default schema prefix for hbase tables. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java index 2aeb245..4364cb6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor .ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler - .ContainerScheduler; + .AbstractContainerScheduler; /** * The ContainerManager is an entity that manages the life cycle of Containers. @@ -42,6 +42,6 @@ void updateQueuingLimit(ContainerQueuingLimit queuingLimit); - ContainerScheduler getContainerScheduler(); + AbstractContainerScheduler getContainerScheduler(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3470910..3182c1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,7 +146,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -164,6 +164,7 @@ import java.io.DataInputStream; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -219,7 +220,7 @@ private final WriteLock writeLock; private AMRMProxyService amrmProxyService; protected boolean amrmProxyEnabled = false; - private final ContainerScheduler containerScheduler; + private final AbstractContainerScheduler containerScheduler; private long waitForContainersOnShutdownMillis; @@ -234,7 +235,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.dirsHandler = dirsHandler; // ContainerManager level dispatcher. - dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher"); + dispatcher = createDispatcher(); this.deletionService = deletionContext; this.metrics = metrics; @@ -289,6 +290,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.writeLock = lock.writeLock(); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("NM ContainerManager dispatcher"); + } + @Override public void serviceInit(Configuration conf) throws Exception { @@ -335,12 +340,32 @@ protected void createAMRMProxyService(Configuration conf) { } } - @VisibleForTesting - protected ContainerScheduler createContainerScheduler(Context cntxt) { - // Currently, this dispatcher is shared by the ContainerManager, - // all the containers, the container monitor and all the container. - // The ContainerScheduler may use its own dispatcher. - return new ContainerScheduler(cntxt, dispatcher, metrics); + protected AbstractContainerScheduler createContainerScheduler(Context cntxt) { + String containerSchedulerClassName = context.getConf().get( + YarnConfiguration.NM_CONTAINER_SCHEDULER_CLASS, + YarnConfiguration.DEFAULT_NM_CONTAINER_SCHEDULER_CLASS); + try { + Class containerSchedulerClazz = Class.forName( + containerSchedulerClassName); + if (AbstractContainerScheduler.class.isAssignableFrom( + containerSchedulerClazz)) { + Class[] paramType = { + Context.class, AsyncDispatcher.class, NodeManagerMetrics.class }; + Object[] params = { context, dispatcher, metrics }; + Constructor constructor = + containerSchedulerClazz.getConstructor(paramType); + return (AbstractContainerScheduler) constructor.newInstance(params); + } else { + throw new YarnRuntimeException(containerSchedulerClassName + + " not " + " instance of " + + AbstractContainerScheduler.class.getCanonicalName()); + } + } catch (Exception e) { + throw new YarnRuntimeException("Could not instantiate " + + AbstractContainerScheduler.class.getName() + ": " + + containerSchedulerClassName, e); + } + } protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) { @@ -1701,7 +1726,7 @@ public OpportunisticContainersStatus getOpportunisticContainersStatus() { @Override public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { - this.containerScheduler.updateQueuingLimit(queuingLimit); + this.containerScheduler.updateOpportunisticContainerQueuingLimit(queuingLimit); } @SuppressWarnings("unchecked") @@ -1868,7 +1893,7 @@ private void internalSignalToContainer(SignalContainerRequest request, } @Override - public ContainerScheduler getContainerScheduler() { + public AbstractContainerScheduler getContainerScheduler() { return this.containerScheduler; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 3875cbc..5fb9b51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1074,7 +1074,7 @@ public void resumeContainer() throws IOException { * @return Process ID * @throws Exception */ - private String getContainerPid(Path pidFilePath) throws Exception { + protected String getContainerPid(Path pidFilePath) throws Exception { String containerIdStr = container.getContainerId().toString(); String processId = null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index cfd5d6a..c3d0a4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -116,8 +116,7 @@ public void handle(ContainersLauncherEvent event) { containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - new ContainerLaunch(context, getConfig(), dispatcher, exec, app, - event.getContainer(), dirsHandler, containerManager); + createContainerLaunch(app, event.getContainer()); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -213,4 +212,10 @@ public void handle(ContainersLauncherEvent event) { break; } } + + protected ContainerLaunch createContainerLaunch( + Application app, Container container) { + return new ContainerLaunch(context, getConfig(), dispatcher, + exec, app, container, dirsHandler, containerManager); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 64831e9..38983c5e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -22,12 +22,19 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; public interface ContainersMonitor extends Service, EventHandler, ResourceView { ResourceUtilization getContainersUtilization(); + /** + * Get the utilization thresholds under which NM will start over-allocating. + * @return overallocation thresholds + */ + ResourceThresholds getOverallocationThresholds(); + float getVmemRatio(); void subtractNodeResourcesFromResourceUtilization( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index acc256f..3e19019 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,7 +232,7 @@ protected void serviceInit(Configuration myConf) throws Exception { /** * Check all prerequisites for NM over-allocation. */ - private void checkOverAllocationPrerequisites() throws YarnException { + protected void checkOverAllocationPrerequisites() throws YarnException { // LinuxContainerExecutor is required to enable overallocation if (!(containerExecutor instanceof LinuxContainerExecutor)) { throw new YarnException(LinuxContainerExecutor.class.getName() + @@ -609,7 +611,7 @@ public void run() { } // Save the aggregated utilization of the containers - setContainersUtilization(trackedContainersUtilization); + updateContainersUtilization(trackedContainersUtilization); // Publish the container utilization metrics to node manager // metrics system. @@ -977,10 +979,32 @@ public boolean isVmemCheckEnabled() { @Override public ResourceUtilization getContainersUtilization() { return this.containersUtilization; + // TODO get the latest result instead of cached value } - private void setContainersUtilization(ResourceUtilization utilization) { + @Override + public ResourceThresholds getOverallocationThresholds() { + ResourceThresholds overAllocationThresholds = null; + if (context.getOverAllocationInfo() != null) { + overAllocationThresholds = + context.getOverAllocationInfo().getOverAllocationThresholds(); + } + return overAllocationThresholds; + } + + private void updateContainersUtilization(ResourceUtilization utilization) { this.containersUtilization = utilization; + + attemptToStartContainersUponLowUtilization(); + // TODO check for preemption if no containers to start + } + + @VisibleForTesting + public boolean attemptToStartContainersUponLowUtilization() { + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + return true; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AbstractContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AbstractContainerScheduler.java new file mode 100644 index 0000000..fad8074 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AbstractContainerScheduler.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An abstract container scheduler that queues opportunistic container. + */ +public abstract class AbstractContainerScheduler + extends AbstractService implements EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContainerScheduler.class); + + protected final Context nmContext; + protected final AsyncDispatcher dispatcher; + protected final NodeManagerMetrics metrics; + + public AbstractContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, String serviceName) { + super(serviceName); + this.nmContext = context; + this.metrics = metrics; + this.dispatcher = dispatcher; + } + + /** + * Handle ContainerSchedulerEvents. + * @param event ContainerSchedulerEvent. + */ + @Override + public void handle(ContainerSchedulerEvent event) { + switch (event.getType()) { + case SCHEDULE_CONTAINER: + onScheduleContainer(event.getContainer()); + break; + case CONTAINER_PAUSED: + // NOTE: Is sent only after container state has changed to PAUSED... + onContainerPaused(event.getContainer()); + break; + case CONTAINER_COMPLETED: + // NOTE: Is sent only after container state has changed to DONE... + onContainerCompleted(event.getContainer()); + break; + case UPDATE_CONTAINER: + onUpdateContainer((UpdateContainerSchedulerEvent) event); + break; + case SHED_QUEUED_CONTAINERS: + shedQueuedOpportunisticContainers(); + break; + case RECOVERY_COMPLETED: + onRecoveryCompleted(); + break; + case SCHEDULE_CONTAINERS: + onScheduleContainers(); + break; + default: + LOG.error("Unknown event arrived at AbstractContainerScheduler: " + + event.toString()); + } + } + + protected abstract void onScheduleContainer(Container container); + + protected abstract void onContainerPaused(Container container); + + protected abstract void onContainerCompleted(Container container); + + protected abstract void onUpdateContainer(UpdateContainerSchedulerEvent event); + + protected abstract void shedQueuedOpportunisticContainers(); + + protected abstract void onRecoveryCompleted(); + + protected abstract void onScheduleContainers(); + + /** + * Recover active running container upon NM recovery. + * recovery. + * @param container container recovered + * @param rcs Recovered Container status + */ + public abstract void recoverActiveContainer( + Container container, RecoveredContainerStatus rcs); + + /** + * Get the status of opportunistic containers. + * @return opportunistic containers status + */ + public abstract OpportunisticContainersStatus + getOpportunisticContainersStatus(); + + /** + * Update the max number of opportunistic containers that can be + * simultaneously queued. + * @param limit the max number of queued opportunistic containers allowed + */ + public abstract void updateOpportunisticContainerQueuingLimit( + ContainerQueuingLimit limit); + + /** + * Get the current resource utilization of all containers. + * @return resource utilization + */ + @VisibleForTesting + public abstract ResourceUtilization getCurrentUtilization(); + + public ContainersMonitor getContainersMonitor() { + return nmContext.getContainerManager().getContainersMonitor(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java index 6e2b617..704fddb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java @@ -35,9 +35,9 @@ LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); private ResourceUtilization containersAllocation; - private ContainerScheduler scheduler; + private AbstractContainerScheduler scheduler; - AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) { + AllocationBasedResourceUtilizationTracker(AbstractContainerScheduler scheduler) { this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); this.scheduler = scheduler; } @@ -56,7 +56,7 @@ public ResourceUtilization getCurrentUtilization() { * @param container Container. */ @Override - public void addContainerResources(Container container) { + public void containerLaunched(Container container) { ContainersMonitor.increaseResourceUtilization( getContainersMonitor(), this.containersAllocation, container.getResource()); @@ -67,7 +67,7 @@ public void addContainerResources(Container container) { * @param container Container. */ @Override - public void subtractContainerResource(Container container) { + public void containerReleased(Container container) { ContainersMonitor.decreaseResourceUtilization( getContainersMonitor(), this.containersAllocation, container.getResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index d9b713f..9ff1017 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -20,14 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -61,13 +59,11 @@ * met. It also ensures that OPPORTUNISTIC containers are killed to make * room for GUARANTEED containers. */ -public class ContainerScheduler extends AbstractService implements - EventHandler { +public class ContainerScheduler extends AbstractContainerScheduler { private static final Logger LOG = LoggerFactory.getLogger(ContainerScheduler.class); - private final Context context; // Capacity of the queue for opportunistic Containers. private final int maxOppQueueLength; @@ -100,9 +96,6 @@ // increases / decreases based on container start / finish private ResourceUtilizationTracker utilizationTracker; - private final AsyncDispatcher dispatcher; - private final NodeManagerMetrics metrics; - private Boolean usePauseEventForPreemption = false; /** @@ -133,10 +126,7 @@ public void serviceInit(Configuration conf) throws Exception { @VisibleForTesting public ContainerScheduler(Context context, AsyncDispatcher dispatcher, NodeManagerMetrics metrics, int qLength) { - super(ContainerScheduler.class.getName()); - this.context = context; - this.dispatcher = dispatcher; - this.metrics = metrics; + super(context, dispatcher, metrics, ContainerScheduler.class.getName()); this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; this.utilizationTracker = new AllocationBasedResourceUtilizationTracker(this); @@ -144,52 +134,48 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, OpportunisticContainersStatus.newInstance(); } - /** - * Handle ContainerSchedulerEvents. - * @param event ContainerSchedulerEvent. - */ @Override - public void handle(ContainerSchedulerEvent event) { - switch (event.getType()) { - case SCHEDULE_CONTAINER: - scheduleContainer(event.getContainer()); - break; - // NOTE: Is sent only after container state has changed to PAUSED... - case CONTAINER_PAUSED: - // NOTE: Is sent only after container state has changed to DONE... - case CONTAINER_COMPLETED: - onResourcesReclaimed(event.getContainer()); - break; - case UPDATE_CONTAINER: - if (event instanceof UpdateContainerSchedulerEvent) { - onUpdateContainer((UpdateContainerSchedulerEvent) event); - } else { - LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); - } - break; - case SHED_QUEUED_CONTAINERS: - shedQueuedOpportunisticContainers(); - break; - case RECOVERY_COMPLETED: - startPendingContainers(maxOppQueueLength <= 0); - default: - LOG.error("Unknown event arrived at ContainerScheduler: " - + event.toString()); - } + protected void onScheduleContainer(Container container) { + scheduleContainer(container); + } + + @Override + protected void onContainerPaused(Container container) { + onResourcesReclaimed(container); + } + + @Override + protected void onContainerCompleted(Container container) { + onResourcesReclaimed(container); + } + + @Override + protected void onUpdateContainer(UpdateContainerSchedulerEvent event) { + handleContainerUpdate(event); + } + + @Override + protected void onRecoveryCompleted() { + startPendingContainers(maxOppQueueLength <= 0); + } + + @Override + protected void onScheduleContainers() { + // ignore out-of-band container scheduling decision } /** * We assume that the ContainerManager has already figured out what kind * of update this is. */ - private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { + private void handleContainerUpdate(UpdateContainerSchedulerEvent updateEvent) { ContainerId containerId = updateEvent.getContainer().getContainerId(); if (updateEvent.isResourceChange()) { if (runningContainers.containsKey(containerId)) { - this.utilizationTracker.subtractContainerResource( + this.utilizationTracker.containerReleased( new ContainerImpl(getConfig(), null, null, null, null, - updateEvent.getOriginalToken(), context)); - this.utilizationTracker.addContainerResources( + updateEvent.getOriginalToken(), nmContext)); + this.utilizationTracker.containerLaunched( updateEvent.getContainer()); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent(containerId, @@ -246,7 +232,7 @@ public void recoverActiveContainer(Container container, } } else if (rcs == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); - utilizationTracker.addContainerResources(container); + utilizationTracker.containerLaunched(container); } } @@ -332,7 +318,7 @@ private void onResourcesReclaimed(Container container) { // only a running container releases resources upon completion boolean resourceReleased = completedContainer != null; if (resourceReleased) { - this.utilizationTracker.subtractContainerResource(container); + this.utilizationTracker.containerReleased(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); @@ -421,7 +407,7 @@ private boolean enqueueContainer(Container container) { if (isQueued) { try { - this.context.getNMStateStore().storeContainerQueued( + nmContext.getNMStateStore().storeContainerQueued( container.getContainerId()); } catch (IOException e) { LOG.warn("Could not store container [" + container.getContainerId() @@ -502,7 +488,7 @@ private void reclaimOpportunisticContainerResources(Container container) { private void startContainer(Container container) { LOG.info("Starting container [" + container.getContainerId()+ "]"); runningContainers.put(container.getContainerId(), container); - this.utilizationTracker.addContainerResources(container); + this.utilizationTracker.containerLaunched(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.startOpportunisticContainer(container.getResource()); @@ -591,7 +577,8 @@ private ResourceUtilization resourcesToFreeUp( } @SuppressWarnings("unchecked") - public void updateQueuingLimit(ContainerQueuingLimit limit) { + public void updateOpportunisticContainerQueuingLimit( + ContainerQueuingLimit limit) { this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); // YARN-2886 should add support for wait-times. Include wait time as // well once it is implemented @@ -604,7 +591,8 @@ public void updateQueuingLimit(ContainerQueuingLimit limit) { } } - private void shedQueuedOpportunisticContainers() { + @Override + protected void shedQueuedOpportunisticContainers() { int numAllowed = this.queuingLimit.getMaxQueueLength(); Iterator containerIter = queuedOpportunisticContainers.values().iterator(); @@ -626,10 +614,6 @@ private void shedQueuedOpportunisticContainers() { } } - public ContainersMonitor getContainersMonitor() { - return this.context.getContainerManager().getContainersMonitor(); - } - @VisibleForTesting public ResourceUtilization getCurrentUtilization() { return this.utilizationTracker.getCurrentUtilization(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 294eddf..545b16c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -28,5 +28,6 @@ // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, CONTAINER_PAUSED, - RECOVERY_COMPLETED + RECOVERY_COMPLETED, + SCHEDULE_CONTAINERS } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainerScheduler.java new file mode 100644 index 0000000..ec5c9bb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainerScheduler.java @@ -0,0 +1,432 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.util.resource.Resources; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * An implementation of AbstractContainerScheduler that queues opportunistic + * containers but launches guaranteed containers immediately. It relies on + * external processes to check for launching or necessary preemption of + * opportunistic containers. It does not support pausing opportunistic + * containers. + * + */ +public class OpportunisticContainerScheduler + extends AbstractContainerScheduler { + private static final Logger LOG = + LoggerFactory.getLogger(OpportunisticContainerScheduler.class); + + private volatile ContainerQueuingLimit opportunisticContainerQueuingLimit = + ContainerQueuingLimit.newInstance(); + + // Queue of Opportunistic Containers waiting for resources to run + protected final LinkedHashMap + queuedOpportunisticContainers = new LinkedHashMap<>(0); + + // Used to keep track of containers that have been marked to be killed + // to make room for a guaranteed container. + private final Map + opportunisticContainersToKill = new HashMap<>(0); + + // Sum of resources to be released by opportunistic containers that + // have been marked to be killed. + private final Resource opportunisticResourcesToBeReleased = + Resource.newInstance(0, 0); + + // Containers launched by the scheduler will take a while to actually + // move to the RUNNING state, This holds containers that are in RUNNING + // as well as those in SCHEDULED state that have been marked to run, + // but not yet RUNNING. + private final LinkedHashMap runningContainers = + new LinkedHashMap<>(); + private ResourceUtilizationTracker utilizationTracker; + + private ContainersMonitor containersMonitor; + + public OpportunisticContainerScheduler(Context context, + AsyncDispatcher dispatcher, NodeManagerMetrics metrics) { + super(context, dispatcher, metrics, + OpportunisticContainerScheduler.class.getName()); + utilizationTracker = new AllocationBasedResourceUtilizationTracker(this); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + int initialMaxQueueLength = conf.getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH); + opportunisticContainerQueuingLimit.setMaxQueueLength(initialMaxQueueLength); + + containersMonitor = nmContext.getContainerManager().getContainersMonitor(); + super.serviceInit(conf); + } + + @Override + protected void onScheduleContainer(Container container) { + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + + if (isGuaranteedContainer) { + // always start guaranteed containers immediately + startContainer(container); + } else { + enqueueOpportunisticContainer(container); + } + } + + protected boolean enqueueOpportunisticContainer(Container container) { + // always try to enqueue opportunistic containers + boolean isQueued = false; + ContainerId containerId = container.getContainerId(); + long maxOpportunisticQueueLength = opportunisticContainerQueuingLimit. + getMaxQueueLength(); + if (queuedOpportunisticContainers.size() < maxOpportunisticQueueLength) { + LOG.info("Opportunistic container {} will be queued at the NM.", + containerId); + queuedOpportunisticContainers.put(containerId, container); + isQueued = true; + } else { + LOG.info("Opportunistic container [{}] will not be queued at the NM" + + "since max queue length [{}] has been reached", + containerId, maxOpportunisticQueueLength); + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Opportunistic container queue is full."); + } + + if (isQueued) { + try { + nmContext.getNMStateStore().storeContainerQueued(containerId); + } catch (IOException e) { + LOG.warn("Could not store container [" + containerId + + "] state. The Container has been queued.", e); + } + } + return isQueued; + } + + @Override + protected void onContainerPaused(Container container) { + // do nothing because container pause is not supported + } + + @Override + protected void onContainerCompleted(Container container) { + ContainerId containerId = container.getContainerId(); + + // This could be killed externally for eg. by the ContainerManager, + // in which case, the container might still be queued. + if (queuedOpportunisticContainers.remove(containerId) != null) { + return; + } + + if (opportunisticContainersToKill.remove(containerId) != null) { + Resources.subtractFrom( + opportunisticResourcesToBeReleased, container.getResource()); + } + + if (runningContainers.remove(containerId) != null) { + // decrement only if it was a running container + utilizationTracker.containerReleased(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + metrics.completeOpportunisticContainer(container.getResource()); + } + } + } + + @Override + protected void onUpdateContainer(UpdateContainerSchedulerEvent event) { + + ContainerId containerId = event.getContainer().getContainerId(); + if (event.isResourceChange()) { + if (runningContainers.containsKey(containerId)) { + utilizationTracker.containerReleased( + new ContainerImpl(getConfig(), null, null, null, null, + event.getOriginalToken(), nmContext)); + utilizationTracker.containerLaunched( + event.getContainer()); + getContainersMonitor().handle( + new ChangeMonitoringContainerResourceEvent(containerId, + event.getUpdatedToken().getResource())); + } + } + + if (event.isExecTypeUpdate()) { + // Promotion or not (Increase signifies either a promotion + // or container size increase) + if (event.isIncrease()) { + // Promotion of queued container.. + Container container = queuedOpportunisticContainers.remove(containerId); + if (container != null) { + startContainer(container); + } + // TODO handle promotion while process is running + } else { + // TODO handle demotion while process is running + } + } + } + + @Override + protected void shedQueuedOpportunisticContainers() { + int numAllowed = opportunisticContainerQueuingLimit.getMaxQueueLength(); + Iterator containerIter = + queuedOpportunisticContainers.values().iterator(); + while (containerIter.hasNext()) { + Container container = containerIter.next(); + if (numAllowed <= 0) { + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container De-queued to meet NM queuing limits."); + containerIter.remove(); + LOG.info( + "Opportunistic container {} will be killed to meet NM queuing" + + " limits.", container.getContainerId()); + } + numAllowed--; + } + } + + @Override + protected void onRecoveryCompleted() { + // do nothing + } + + @Override + protected void onScheduleContainers() { + // try to launch opportunistic containers in queue + Resource available = getAvailableResourcesForOpportunisticContainers(); + startOpportunisticContainers(available); + } + + /** + * Get the amount of resources available to launch OPPORTUNISTIC + * containers. + * @return the resource available + */ + private Resource getAvailableResourcesForOpportunisticContainers() { + Resource resourceBasedOnAllocation = + getResourceAvailableBasedOnAllocation(); + Resource resourceBasedOnUtilization = + getResourcesAvailableBasedOnUtilization(); + if (LOG.isDebugEnabled()) { + LOG.debug("The amount of resources available based on allocation is " + + resourceBasedOnAllocation + ", based on utilization is " + + resourceBasedOnUtilization); + } + return Resources.componentwiseMax(resourceBasedOnAllocation, + resourceBasedOnUtilization); + } + + /** + * Get the amount of resources that have not been allocated. + * @return unallocated resources + */ + private Resource getResourceAvailableBasedOnAllocation() { + // unallocated resources = node capacity - containers allocation + // = -(container allocation - node capacity) + ResourceUtilization allocationClone = ResourceUtilization.newInstance( + utilizationTracker.getCurrentUtilization()); + getContainersMonitor() + .subtractNodeResourcesFromResourceUtilization(allocationClone); + + Resource unallocated = Resources.none(); + if (allocationClone.getCPU() <= 0 && + allocationClone.getPhysicalMemory() <= 0 && + allocationClone.getVirtualMemory() <= 0) { + int cpu = Math.round(allocationClone.getCPU() * + getContainersMonitor().getVCoresAllocatedForContainers()); + long memory = allocationClone.getPhysicalMemory(); + unallocated = Resource.newInstance(-memory, -cpu); + } + return unallocated; + } + + /** + * Get the amount of resources available based on over-allocation + * if it is turned on. + * @return resources available + */ + private Resource getResourcesAvailableBasedOnUtilization() { + ResourceThresholds overAllocationThresholds = + containersMonitor.getOverallocationThresholds(); + + if (overAllocationThresholds == null) { + return Resources.none(); + } + + ResourceUtilization utilization = getCurrentNodeUtilization(); + long memoryAvailable = Math.round( + overAllocationThresholds.getMemoryThreshold() * + containersMonitor.getPmemAllocatedForContainers()) - + (utilization.getPhysicalMemory() << 20); + int vcoreAvailable = Math.round( + (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) * + containersMonitor.getVCoresAllocatedForContainers()); + return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable); + } + + private ResourceUtilization getCurrentNodeUtilization() { + // TODO the resource utilization can be very stale, get the latest instead + return containersMonitor.getContainersUtilization(); + } + + /** + * Try to launch as many OPPORTUNISTIC containers as possible. + * @param available the amount of resources available to launch containers + * @return true if all OPPORTUNISTIC containers are launched + * or there is no OPPORTUNISTIC containers to launch + */ + private boolean startOpportunisticContainers(Resource available) { + Iterator cIter = + queuedOpportunisticContainers.values().iterator(); + boolean resourcesAvailable = true; + while (cIter.hasNext() && resourcesAvailable) { + Container container = cIter.next(); + if (isResourceAvailable(available, container)) { + startContainer(container); + Resources.subtractFrom(available, container.getResource()); + cIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + /** + * Check is given resource is enough to launch a given container + * @param resource the remaining resource + * @param container the container to launch + * @return true if the container can be launched with given resource + */ + protected static boolean isResourceAvailable( + Resource resource, Container container) { + Resource left = Resources.subtract(resource, container.getResource()); + return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0; + } + + + @Override + public void recoverActiveContainer(Container container, + RecoveredContainerStatus rcs) { + ExecutionType execType = + container.getContainerTokenIdentifier().getExecutionType(); + ContainerId containerId = container.getContainerId(); + if (rcs == RecoveredContainerStatus.QUEUED) { + if (execType == ExecutionType.GUARANTEED) { + startContainer(container); + } else if (execType == ExecutionType.OPPORTUNISTIC) { + queuedOpportunisticContainers + .put(container.getContainerId(), container); + } else { + LOG.error( + "UnKnown execution type received " + container.getContainerId() + + ", execType " + execType); + } + } else if (rcs == RecoveredContainerStatus.LAUNCHED) { + runningContainers.put(containerId, container); + utilizationTracker.containerLaunched(container); + } + } + + @Override + public OpportunisticContainersStatus getOpportunisticContainersStatus() { + OpportunisticContainersStatus status = OpportunisticContainersStatus.newInstance(); + status.setQueuedOpportContainers( + queuedOpportunisticContainers.size()); + status.setWaitQueueLength( + queuedOpportunisticContainers.size()); + status.setOpportMemoryUsed( + metrics.getAllocatedOpportunisticGB()); + status.setOpportCoresUsed( + metrics.getAllocatedOpportunisticVCores()); + status.setRunningOpportContainers( + metrics.getRunningOpportunisticContainers()); + status.setOpportQueueCapacity( + opportunisticContainerQueuingLimit.getMaxQueueLength()); + return status; + } + + @Override + public void updateOpportunisticContainerQueuingLimit( + ContainerQueuingLimit limit) { + opportunisticContainerQueuingLimit = limit; + // YARN-2886 should add support for wait-times. Include wait time as + // well once it is implemented + if ((opportunisticContainerQueuingLimit.getMaxQueueLength() > -1) && + (opportunisticContainerQueuingLimit.getMaxQueueLength() < + queuedOpportunisticContainers.size())) { + dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.SHED_QUEUED_CONTAINERS)); + } + } + + @Override + public ResourceUtilization getCurrentUtilization() { + return utilizationTracker.getCurrentUtilization(); + } + + private void startContainer(Container container) { + LOG.info("Launching container [" + container.getContainerId()+ "]"); + runningContainers.put(container.getContainerId(), container); + utilizationTracker.containerLaunched(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + metrics.startOpportunisticContainer(container.getResource()); + } + container.sendLaunchEvent(); + } + + @VisibleForTesting + public int getNumberOfRunningContainers() { + return runningContainers.size(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 3c17eca..0501b85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -41,13 +41,13 @@ * Add Container's resources to Node Utilization. * @param container Container. */ - void addContainerResources(Container container); + void containerLaunched(Container container); /** * Subtract Container's resources to Node Utilization. * @param container Container. */ - void subtractContainerResource(Container container); + void containerReleased(Container container); /** * Check if NM has resources available currently to run the container. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 93d0afb..9d68e51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -345,6 +346,42 @@ public static void waitForContainerState( fStates.contains(containerStatus.getState())); } + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + ContainerSubState finalState) + throws InterruptedException, YarnException, IOException { + waitForContainerSubState(containerManager, containerID, + Arrays.asList(finalState), 20); + } + + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + List finalStates, int timeOutMax) + throws InterruptedException, YarnException, IOException { + List list = new ArrayList<>(); + list.add(containerID); + GetContainerStatusesRequest request = + GetContainerStatusesRequest.newInstance(list); + ContainerStatus containerStatus; + HashSet fStates = new HashSet<>(finalStates); + int timeoutSecs = 0; + do { + Thread.sleep(1000); + containerStatus = + containerManager.getContainerStatuses(request) + .getContainerStatuses().get(0); + LOG.info("Waiting for container to get into one of states " + fStates + + ". Current state is " + containerStatus.getContainerSubState()); + timeoutSecs += 1; + } while (!fStates.contains(containerStatus.getContainerSubState()) + && timeoutSecs < timeOutMax); + LOG.info("Container state is " + containerStatus.getContainerSubState()); + Assert.assertTrue("ContainerSubState is not correct (timedout)", + fStates.contains(containerStatus.getContainerSubState())); + } + + + public static void waitForApplicationState( ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 6d198a4..83cf6f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -34,10 +35,12 @@ import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; +import java.lang.reflect.Constructor; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -108,6 +111,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -203,6 +208,12 @@ protected UserGroupInformation getRemoteUgi() throws YarnException { .getKeyId())); return ugi; } + + @Override + protected AbstractContainerScheduler createContainerScheduler( + Context context) { + return new ContainerScheduler(context, dispatcher, metrics); + } }; } @@ -538,10 +549,12 @@ private String doRestartTests(ContainerId cId, File oldStartFile, ResourceUtilization beforeUpgrade = ResourceUtilization.newInstance( containerManager.getContainerScheduler().getCurrentUtilization()); + LOG.info("Before upgrade utilization: " + beforeUpgrade); prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile); ResourceUtilization afterUpgrade = ResourceUtilization.newInstance( containerManager.getContainerScheduler().getCurrentUtilization()); + LOG.info("After upgrade utilization: " + afterUpgrade); Assert.assertEquals("Possible resource leak detected !!", beforeUpgrade, afterUpgrade); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index cad835c..adadc2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -619,7 +620,7 @@ private void commonLaunchContainer(ApplicationId appId, ContainerId cid, .containermanager.container.ContainerState.RUNNING); } - private ContainerManagerImpl createContainerManager(Context context, + protected ContainerManagerImpl createContainerManager(Context context, DeletionService delSrvc) { return new ContainerManagerImpl(context, exec, delSrvc, mock(NodeStatusUpdater.class), metrics, dirsHandler) { @@ -633,7 +634,7 @@ protected void authorizeGetAndStopContainerRequest( } } @Override - protected ContainerScheduler createContainerScheduler(Context context) { + protected AbstractContainerScheduler createContainerScheduler(Context context) { return new ContainerScheduler(context, dispatcher, metrics){ @Override public ContainersMonitor getContainersMonitor() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java new file mode 100644 index 0000000..edd6970 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.OpportunisticContainerScheduler; + +import static org.mockito.Mockito.mock; + +/** + * Test ContainerManager recovery when Opportunistic Container Scheduler + * is enabled. + */ +public class TestContainerManagerRecoveryWithOpportunisticContainerScheduler + extends TestContainerManagerRecovery { + public TestContainerManagerRecoveryWithOpportunisticContainerScheduler() + throws UnsupportedFileSystemException { + } + + protected ContainerManagerImpl createContainerManager( + Context context, DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, + mock(NodeStatusUpdater.class), metrics, dirsHandler) { + @Override + protected void authorizeGetAndStopContainerRequest( + ContainerId containerId, Container container, + boolean stopRequest, NMTokenIdentifier identifier) + throws YarnException { + if(container == null || container.getUser().equals("Fail")){ + throw new YarnException("Reject this container"); + } + } + @Override + protected AbstractContainerScheduler createContainerScheduler( + Context context) { + return new OpportunisticContainerScheduler(context, dispatcher, metrics) { + @Override + public ContainersMonitor getContainersMonitor() { + return new ContainersMonitorImpl(null, null, null) { + @Override + public float getVmemRatio() { + return 2.0f; + } + + @Override + public long getVmemAllocatedForContainers() { + return 20480; + } + + @Override + public long getPmemAllocatedForContainers() { + return (long) 2048 << 20; + } + + @Override + public long getVCoresAllocatedForContainers() { + return 4; + } + }; + } + }; + } + }; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java new file mode 100644 index 0000000..47e6b4a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.OpportunisticContainerScheduler; + +public class TestContainerManagerWithOpportunisticContainerScheduler + extends TestContainerManager { + public TestContainerManagerWithOpportunisticContainerScheduler() + throws UnsupportedFileSystemException { + } + + @Override + protected ContainerManagerImpl createContainerManager( + DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + metrics, dirsHandler) { + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected AbstractContainerScheduler createContainerScheduler( + Context context) { + return new OpportunisticContainerScheduler(context, dispatcher, metrics); + } + }; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java index 82c2147..88e3bdd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java @@ -68,7 +68,7 @@ public void testHasResourcesAvailable() { when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); for (int i = 0; i < 2; i++) { Assert.assertTrue(tracker.hasResourcesAvailable(testContainer)); - tracker.addContainerResources(testContainer); + tracker.containerLaunched(testContainer); } Assert.assertFalse(tracker.hasResourcesAvailable(testContainer)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java index 5b99285..3bd6ae8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java @@ -24,7 +24,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -41,6 +44,7 @@ */ public class TestContainerSchedulerBehaviorCompatibility extends BaseContainerManagerTest { + public TestContainerSchedulerBehaviorCompatibility() throws UnsupportedFileSystemException { super(); @@ -78,7 +82,8 @@ public void testForceStartGuaranteedContainersWhenOppContainerDisabled() StartContainersRequest.newInstance(list); containerManager.startContainers(allRequests); - ContainerScheduler cs = containerManager.getContainerScheduler(); + ContainerScheduler cs = + (ContainerScheduler) containerManager.getContainerScheduler(); int nQueuedContainers = cs.getNumQueuedContainers(); int nRunningContainers = cs.getNumRunningContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 5c72e7e..92e7094 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -166,6 +166,12 @@ public long getVCoresAllocatedForContainers() { } }; } + + @Override + protected AbstractContainerScheduler createContainerScheduler( + Context context) { + return new ContainerScheduler(context, dispatcher, metrics); + } }; } @@ -320,7 +326,7 @@ public void testQueueMultipleContainers() throws Exception { } ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); // Ensure both containers are properly queued. Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); Assert.assertEquals(1, @@ -390,7 +396,7 @@ public void testStartAndQueueMultipleContainers() throws Exception { } ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); // Ensure two containers are properly queued. Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); Assert.assertEquals(0, @@ -473,7 +479,7 @@ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { } ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); Assert.assertEquals(maxOppQueueLength, containerScheduler.getNumQueuedContainers()); Assert.assertEquals(0, @@ -578,7 +584,7 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception { @Test public void testPauseOpportunisticForGuaranteedContainer() throws Exception { containerManager.start(); - containerManager.getContainerScheduler(). + ((ContainerScheduler) containerManager.getContainerScheduler()). setUsePauseEventForPreemption(true); Listener listener = new Listener(); @@ -759,7 +765,7 @@ public void testQueueShedding() throws Exception { containerManager.startContainers(allRequests); ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); // Ensure all containers are properly queued. int numTries = 30; while ((containerScheduler.getNumQueuedContainers() < 6) && @@ -771,7 +777,8 @@ public void testQueueShedding() throws Exception { ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit .newInstance(); containerQueuingLimit.setMaxQueueLength(2); - containerScheduler.updateQueuingLimit(containerQueuingLimit); + containerScheduler.updateOpportunisticContainerQueuingLimit( + containerQueuingLimit); numTries = 30; while ((containerScheduler.getNumQueuedContainers() > 2) && (numTries-- > 0)) { @@ -853,7 +860,7 @@ public void testContainerDeQueuedAfterAMKill() throws Exception { containerManager.startContainers(allRequests); ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); // Ensure both containers are properly queued. int numTries = 30; while ((containerScheduler.getNumQueuedContainers() < 2) && @@ -1182,7 +1189,7 @@ public void testPromotionOfOpportunisticContainers() throws Exception { } ContainerScheduler containerScheduler = - containerManager.getContainerScheduler(); + (ContainerScheduler) containerManager.getContainerScheduler(); // Ensure two containers are properly queued. Assert.assertEquals(1, containerScheduler.getNumQueuedContainers()); Assert.assertEquals(0, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java index 2ae8b97..73f4d16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java @@ -80,7 +80,7 @@ .thenReturn(appId); when(containerId.getContainerId()).thenReturn(123L); doNothing().when(allocationBasedResourceUtilizationTracker) - .addContainerResources(container); + .containerLaunched(container); } @After public void tearDown() { @@ -102,7 +102,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as QUEUED, OPPORTUNISTIC, @@ -121,7 +121,7 @@ assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as PAUSED, GUARANTEED, @@ -140,7 +140,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as PAUSED, OPPORTUNISTIC, @@ -159,7 +159,7 @@ assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as LAUNCHED, GUARANTEED, @@ -178,7 +178,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC, @@ -197,7 +197,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as REQUESTED, GUARANTEED, @@ -216,7 +216,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as REQUESTED, OPPORTUNISTIC, @@ -235,7 +235,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as COMPLETED, GUARANTEED, @@ -254,7 +254,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as COMPLETED, OPPORTUNISTIC, @@ -273,7 +273,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as GUARANTEED but no executionType set, @@ -291,7 +291,7 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } /*Test if a container is recovered as PAUSED but no executionType set, @@ -309,6 +309,6 @@ assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + .containerLaunched(container); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java new file mode 100644 index 0000000..3f61f68 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java @@ -0,0 +1,300 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService + .RecoveredContainerStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests to verify that the {@link OpportunisticContainerScheduler} is + * able to recover active containers based on RecoveredContainerStatus + * and ExecutionType. + */ +public class TestOpportunisticContainerRecovery { + @Mock + private NMContext context; + + @Mock private NodeManagerMetrics metrics; + + @Mock private AsyncDispatcher dispatcher; + + @Mock private ContainerTokenIdentifier token; + + @Mock private ContainerImpl container; + + @Mock private ApplicationId appId; + + @Mock private ApplicationAttemptId appAttemptId; + + @Mock private ContainerId containerId; + + @Mock private AllocationBasedResourceUtilizationTracker + allocationBasedResourceUtilizationTracker; + + @InjectMocks + private OpportunisticContainerScheduler containerScheduler = + new OpportunisticContainerScheduler(context, dispatcher, metrics); + + private OpportunisticContainerScheduler spy; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + spy = spy(containerScheduler); + + when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId); + when(containerId.getApplicationAttemptId().getApplicationId()) + .thenReturn(appId); + when(containerId.getContainerId()).thenReturn(123L); + when(container.getContainerId()).thenReturn(containerId); + doNothing().when(allocationBasedResourceUtilizationTracker) + .containerLaunched(container); + } + + /** + * Test the recovery of a QUEUED GUARANTEED container. + * it should be launched immediately. + */ + @Test + public void testRecoverContainerQueuedGuaranteed() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + verify(allocationBasedResourceUtilizationTracker, times(1)) + .containerLaunched(container); + verify(container, times(1)).sendLaunchEvent(); + assertEquals(1, spy.getNumberOfRunningContainers()); + } + + /** + * Test the recovery of a QUEUED OPPORTUNISTIC container. + * it should be queued. + */ + @Test public void testRecoverContainerQueuedOpportunistic() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + OpportunisticContainersStatus status = + containerScheduler.getOpportunisticContainersStatus(); + assertEquals(status.getQueuedOpportContainers(), 1); + assertEquals(status.getRunningOpportContainers(), 0); + assertEquals(0, spy.getNumberOfRunningContainers()); + } + + /** + * Test the recovery of a LAUNCHED GUARANTEED container. + */ + @Test public void testRecoverContainerLaunchedGuaranteed() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + verify(allocationBasedResourceUtilizationTracker, times(1)) + .containerLaunched(container); + assertEquals(1, spy.getNumberOfRunningContainers()); + } + + /** + * Test the recovery of a LAUNCHED OPPORTUNISTIC container. + */ + @Test public void testRecoverContainerLaunchedOpportunistic() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + verify(allocationBasedResourceUtilizationTracker, times(1)) + .containerLaunched(container); + assertEquals(1, spy.getNumberOfRunningContainers()); + } + + /** + * Test the recovery of a REQUESTED GUARANTEED container. + * It should be ignored. + */ + @Test public void testRecoverContainerRequestedGuaranteed() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + assertEquals(0, spy.getNumberOfRunningContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a REQUESTED OPPORTUNISTIC container. + * It should be ignored. + */ + @Test public void testRecoverContainerRequestedOpportunistic() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + OpportunisticContainersStatus status = + spy.getOpportunisticContainersStatus(); + assertEquals(0, status.getRunningOpportContainers()); + assertEquals(0, status.getQueuedOpportContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a PAUSED GUARANTEED container. + * It should be ignored. + */ + @Test public void testRecoverContainerPausedGuaranteed() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + assertEquals(0, spy.getNumberOfRunningContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a PAUSED OPPORTUNISTIC container. + * It should be ignored. + */ + @Test public void testRecoverContainerPausedOpportunistic() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + OpportunisticContainersStatus status = + spy.getOpportunisticContainersStatus(); + assertEquals(0, status.getRunningOpportContainers()); + assertEquals(0, status.getQueuedOpportContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a COMPLETED GUARANTEED container. + * It should be ignored. + */ + @Test public void testRecoverContainerCompletedGuaranteed() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + assertEquals(0, spy.getNumberOfRunningContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a COMPLETED OPPORTUNISTIC container. + * It should be ignored. + */ + @Test public void testRecoverContainerCompletedOpportunistic() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + OpportunisticContainersStatus status = + spy.getOpportunisticContainersStatus(); + assertEquals(0, status.getRunningOpportContainers()); + assertEquals(0, status.getQueuedOpportContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a QUEUED container without any ExecutionType. + * It should be ignored. + */ + @Test public void testContainerQueuedNoExecType() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + OpportunisticContainersStatus status = + spy.getOpportunisticContainersStatus(); + assertEquals(0, status.getRunningOpportContainers()); + assertEquals(0, status.getQueuedOpportContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } + + /** + * Test the recovery of a QUEUED container without any ExecutionType. + * It should be ignored. + */ + @Test public void testContainerPausedNoExecType() + throws IllegalArgumentException, IllegalAccessException { + RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + when(container.getContainerTokenIdentifier()).thenReturn(token); + spy.recoverActiveContainer(container, rcs); + + OpportunisticContainersStatus status = + spy.getOpportunisticContainersStatus(); + assertEquals(0, status.getRunningOpportContainers()); + assertEquals(0, status.getQueuedOpportContainers()); + verify(allocationBasedResourceUtilizationTracker, times(0)) + .containerLaunched(container); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java new file mode 100644 index 0000000..e649d9c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java @@ -0,0 +1,1294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import com.google.common.base.Supplier; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.exceptions.ConfigurationException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + +import org.eclipse.jetty.util.ConcurrentHashSet; + +import org.junit.Assert; +import org.junit.Test; + +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TestOpportunisticContainerScheduler + extends BaseContainerManagerTest { + + private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3; + private static final int NM_CONTAINERS_VCORES = 4; + private static final int NM_CONTAINERS_MEMORY_MB = 2048; + + static { + LOG = LoggerFactory.getLogger(TestOpportunisticContainerScheduler.class); + } + + public TestOpportunisticContainerScheduler() + throws UnsupportedFileSystemException { + } + + @Override + protected ContainerExecutor createContainerExecutor() { + DefaultContainerExecutor exec = + new LongRunningContainerSimulatingContainerExecutor(); + exec.setConf(conf); + return exec; + } + + @Override + protected ContainerManagerImpl createContainerManager( + DeletionService delSrvc) { + return new LongRunningContainerSimulatingContainersManager( + context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user) { + // use OpportunisticContainerScheduler + @Override + protected AbstractContainerScheduler + createContainerScheduler(Context cntxt) { + return new OpportunisticContainerSchedulerForTest( + context, dispatcher, metrics); + } + }; + } + + @Override + public void setup() throws IOException { + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + NM_OPPORTUNISTIC_QUEUE_LIMIT); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD, + 0.75f); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, + 0.75f); + // disable container monitor thread + conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false); + + super.setup(); + } + + /** + * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones. + * Try killing one of the two queued containers. + * @throws Exception + */ + @Test + public void testStopQueuedContainer() throws Exception { + containerManager.start(); + + StartContainersRequest allRequests = StartContainersRequest.newInstance( + new ArrayList() { { + add(createStartContainerRequest(0, + BuilderUtils.newResource(2048, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)); + } } + ); + containerManager.startContainers(allRequests); + + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + + // Assert there is initially one container running and two queued. + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.SCHEDULED); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + + // Stop one of the two queued containers. + StopContainersRequest stopRequest = StopContainersRequest. + newInstance(Arrays.asList(createContainerId(1))); + containerManager.stopContainers(stopRequest); + + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.DONE); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + } + + /** + * 1. Submit a long running GUARANTEED container to hog all NM resources. + * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued. + * 3. Update the Queue Limit to 2. + * 4. Ensure only 2 containers remain in the Queue, and 4 are de-Queued. + * @throws Exception + */ + @Test + public void testQueueShedding() throws Exception { + containerManager.start(); + + // start a long running GUARANTEED container to hog all NM resources. + StartContainersRequest allRequests = StartContainersRequest.newInstance( + new ArrayList() { { + add(createStartContainerRequest(0, + BuilderUtils.newResource(2048, 1), true)); + } } + ); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 0.25f)); + + // try to start 3 OPPORTUNISTIC containers, all of which will be queued. + allRequests = StartContainersRequest.newInstance( + new ArrayList() { { + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), false)); + } } + ); + containerManager.startContainers(allRequests); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + waitForOpportunisticContainerToBeQueued(createContainerId(3)); + + // update the queue limit to 2 + ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit + .newInstance(); + containerQueuingLimit.setMaxQueueLength(2); + containerManager.getContainerScheduler().updateOpportunisticContainerQueuingLimit( + containerQueuingLimit); + + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(3), ContainerSubState.DONE); + + // check if four GUARANTEED containers are dequeued + List statList = new ArrayList<>(); + for (int i = 1; i < 4; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + + int deQueuedContainers = 0; + int numQueuedOppContainers = 0; + for (ContainerStatus status : containerStatuses) { + if (status.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + if (status.getDiagnostics().contains( + "Container De-queued to meet NM queuing limits")) { + deQueuedContainers++; + } + if (ContainerSubState.SCHEDULED == status.getContainerSubState()) { + numQueuedOppContainers++; + } + } + } + Assert.assertEquals(1, deQueuedContainers); + Assert.assertEquals(2, numQueuedOppContainers); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate + * do not exceed the capacity of the node. The GUARANTEED containers is + * expected to start running immediately, the OPPORTUNISTIC container is + * expected to start running after out-of-band container scheduling check is + * done. + */ + @Test + public void testStartMultipleContainersWithoutOverallocation() + throws Exception { + containerManager.start(); + + StartContainersRequest allRequests = StartContainersRequest.newInstance( + new ArrayList() { { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } } + ); + containerManager.startContainers(allRequests); + + // OPPORTUNISTIC containers are always queued first + waitForOpportunisticContainerToBeQueued(createContainerId(0)); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.SCHEDULED); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + + // the GUARANTEED container is fully utilizing its resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(1024, 0, 0.5f)); + // start the OPPORTUNISTIC container in an out-of-band fashion + ((LongRunningContainerSimulatingContainersManager)containerManager) + .attemptToStartOpportunisticContainers(); + + // the OPPORTUNISTIC container is expected to be launched and start running + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization + * is very low relative to their resource request, resulting in a low node + * utilization. Then start another OPPORTUNISTIC containers which requests + * more than what's left unallocated on the node. Due to overallocation + * being turned on and node utilization being low, the second OPPORTUNISTIC + * container is also expected to be launched. + */ + @Test + public void testStartOppContainersWithPartialOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + + // the GUARANTEED container is expected to start running and the OPPORTUNISTIC + // is supposed to be queued + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + // the GUARANTEED container's utilization is very low + setContainerResourceUtilization( + ResourceUtilization.newInstance(256, 0, 1.0f/8)); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // the OPPORTUNISTIC container is expected to be running + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + setContainerResourceUtilization( + ResourceUtilization.newInstance(512, 0, 1.0f/6)); + + // start a container that requests more than what's left unallocated + // 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // this OPPORTUNISTIC container is expected to be launched because there are + // (memory: 1024, vcore: 0.625) available based on over-allocation threshold + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most + * of the resources they requested, resulting in a high node utilization. + * Then start another OPPORTUNISTIC containers which requests more than what's + * left unallocated on the node. Because of the high resource utilization on + * the node, the projected utilization, if we were to start the second + * OPPORTUNISTIC container, will go over the NM overallocation + * threshold, so the second OPPORTUNISTIC container is expected to be queued. + */ + @Test + public void testQueueOppContainerWithPartialOverallocationHighUtilization() + throws Exception { + containerManager.start(); + + // try to start one GUARANTEED, one OPPORTUNISTIC container + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + + // the GUARANTEED container is expected to be running, whereas the OPPORTUNISTIC + // one is supposed to be in queue. + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + // try to launch the OPPORTUNISTIC container + setContainerResourceUtilization( + ResourceUtilization.newInstance(1024, 0, 1.0f/8)); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + // the OPPORTUNISTIC container is expected to be running + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + // the aggregate utilization of the two containers is very high + setContainerResourceUtilization( + ResourceUtilization.newInstance(1500, 0, 1.0f/6)); + + // try to start a container that requests more than what's left unallocated + // 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // this container will not start running because there is not + // enough resource available at the moment either in terms of + // resources unallocated or in terms of the actual utilization + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.SCHEDULED); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + } + + /** + * Start two GUARANTEED containers which in aggregate takes up the whole node + * capacity, yet whose utilization is low relative to their resource request, + * resulting in a low node resource utilization. Then try to start another + * OPPORTUNISTIC containers. Because the resource utilization across the node + * is low and over-allocation being turned on, the OPPORTUNISTIC container is + * expected to be launched even though there is no resources left unallocated. + */ + @Test + public void testStartOppContainersWithOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + // try to start two GUARANTEED containers that takes up the whole node + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + // both GUARANTEED containers are expected to be running immediately + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the current utilization of the two GUARANTEED containers is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(800, 0, 1.0f/8)); + + // try to start a OPPORTUNISTIC container when there is no resources left + // unallocated. + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // the OPPORTUNISTIC container is expected to be started because there are resources + // available since the actual utilization is very low + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + + /** + * Start two GUARANTEED containers which in aggregate take up the whole node + * capacity and fully utilize the resources they requested. Then try to start + * four OPPORTUNISTIC containers of which three will be queued and one will be + * killed because of the max queue length is 3. + */ + @Test + public void testQueueOppContainersWithFullUtilization() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the containers are fully utilizing their resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f/8)); + + // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container + // queue can hold when there is no unallocated resource left. + List moreContainerRequests = + new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1); + for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) { + moreContainerRequests.add( + createStartContainerRequest(2 + a, + BuilderUtils.newResource(512, 1), false)); + } + containerManager.startContainers( + StartContainersRequest.newInstance(moreContainerRequests)); + for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT; a++) { + waitForOpportunisticContainerToBeQueued(createContainerId(2 + a)); + } + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // All OPPORTUNISTIC containers but the last one should be queued. + // The last OPPORTUNISTIC container to launch should be killed. + BaseContainerManagerTest.waitForContainerState( + containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2), + ContainerState.COMPLETE); + + HashMap expectedContainerStatus = + new HashMap<>(); + expectedContainerStatus.put( + createContainerId(0), ContainerSubState.RUNNING); + expectedContainerStatus.put( + createContainerId(1), ContainerSubState.RUNNING); + expectedContainerStatus.put( + createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT), + ContainerSubState.DONE); + for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) { + expectedContainerStatus.put( + createContainerId(i + 2), ContainerSubState.SCHEDULED); + } + verifyContainerStatuses(expectedContainerStatus); + } + + /** + * Start two GUARANTEED containers that together does not take up the + * whole node. Then try to start one OPPORTUNISTIC container that will + * fit into the remaining unallocated space on the node. + * The OPPORTUNISTIC container is expected to start even though the + * current node utilization is above the NM overallocation threshold, + * because it's always safe to launch containers as long as the node + * has not been fully allocated. + */ + @Test + public void testStartOppContainerWithHighUtilizationNoOverallocation() + throws Exception { + containerManager.start(); + + // try to start two GUARANTEED containers + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1200, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(400, 1), true)); + } + } + )); + // both containers are expected to be running + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the containers utilization is above the over-allocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1600, 0, 1.0f/2)); + + // try to start an OPPORTUNISTIC container that can just fit in the + // remaining unallocated space + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(400, 1), false)) + )); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // the OPPORTUNISTIC container can be safely launched even though + // the container utilization is above the NM overallocation threshold + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start two OPPORTUNISTIC containers first whose utilization is low relative + * to the resources they requested, resulting in a low node utilization. Then + * try to start a GUARANTEED container which requests more than what's left + * unallocated on the node. Because the node utilization is low and NM + * overallocation is turned on, the GUARANTEED container is expected to be + * started immediately without killing any running OPPORTUNISTIC containers. + * + * TODO: add preemption check when YARN-6672 is done + */ + @Test + public void testKillNoOppContainersWithPartialOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + // try to start two OPPORTUNISTIC containers + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(0)); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // both OPPORTUNISTIC containers are expected to be running + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the current aggregate containers utilization is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(512, 0, 1.0f/8)); + + // start a GUARANTEED container that requests more than what's left + // unallocated on the node: (512 + 1024 + 824) > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), true)) + )); + + // the GUARANTEED container is expected be launched immediately + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start two OPPORTUNISTIC containers whose utilization will be high relative + * to the resources they requested, resulting in a high node utilization. + * Then try to start a GUARANTEED container which requests more than what's + * left unallocated on the node. Because the node is under high utilization, + * the second OPPORTUNISTIC container is expected to be killed in order to + * make room for the GUARANTEED container. + * + * TODO: add preemption check when YARN-6672 is done + */ + @Test + public void testKillNoOppContainersWithPartialOverallocationHighUtilization() + throws Exception { + containerManager.start(); + + // try to start two OPPORTUNISTIC containers + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(0)); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // both OPPORTUNISTIC containers are expected to be running + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the current aggregate containers utilization is high + setContainerResourceUtilization( + ResourceUtilization.newInstance(1800, 0, 1.0f/8)); + + // try to start a GUARANTEED container that requests more than what's left + // unallocated on the node 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), true)) + )); + + // the GUARANTEED container is expected be launched immediately + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // TODO when preemption is added, verify enough OPPORTUNSTIC containers + // are killed. + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + + /** + * Start three OPPORTUNISTIC containers which in aggregates exceeds the + * capacity of the node, yet whose utilization is low relative + * to the resources they requested, resulting in a low node utilization. + * Then try to start a GUARANTEED container. Even though the node has + * nothing left unallocated, it is expected to start immediately + * without killing any running OPPORTUNISTIC containers because the node + * utilization is very low and overallocation is turned on. + * + * TODO: add preemption check when YARN-6672 is done + */ + @Test + public void testKillNoOppContainersWithOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + // try to start two OPPORTUNISTIC containers + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(0)); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // both OPPORTUNISTIC containers are expected to start running + // because the containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // try to start another OPPORTUNISTIC container + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(2)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // the OPPORTUNISTIC container is also expected to start running + // because the aggregate containers utilization is still 0 + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // the current aggregate containers utilization is low after launching + // three OPPORTUNISTIC containers + setContainerResourceUtilization( + ResourceUtilization.newInstance(1024, 0, 1.0f/8)); + + // try to start a GUARANTEED container that requests more than what's left + // unallocated on the node: (512 + 1024 + 824) > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), true)) + )); + + // the GUARANTEED container is expected be launched immediately without + // killing any OPPORTUNISTIC containers + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start two OPPORTUNISTIC containers followed by one GUARANTEED container, + * which in aggregate exceeds the capacity of the node. The first two + * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED + * one utilizes nearly all of its resource requested. Then try to start two + * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are + * expected to be queued immediately. Upon the completion of the + * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers + * are expected to start. + */ + @Test + public void testStartOppContainersUponContainerCompletion() throws Exception { + containerManager.start(); + + // try to start two OPPORTUNISTIC containers and one GUARANTEED container + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(0)); + waitForOpportunisticContainerToBeQueued(createContainerId(1)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // All three containers are expected to start immediately + // because the node utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // the aggregate containers utilization is at the overallocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + + // try to start two OPPORTUNISTIC containers + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(4, + BuilderUtils.newResource(512, 1), false)); + } + } + )); + waitForOpportunisticContainerToBeQueued(createContainerId(3)); + waitForOpportunisticContainerToBeQueued(createContainerId(4)); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + + // the two new OPPORTUNISTIC containers are expected to be queued + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(3), ContainerSubState.SCHEDULED); + put(createContainerId(4), ContainerSubState.SCHEDULED); + } + }); + + // the GUARANTEED container is completed releasing resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(100, 0, 1.0f/5)); + allowContainerToSucceed(2); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.DONE); + + // check again to see if the queue OPPORTUNISTIC containers can be launched + ((LongRunningContainerSimulatingContainersManager) containerManager) + .attemptToStartOpportunisticContainers(); + // the two OPPORTUNISTIC containers are expected to start together + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(4), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.DONE); + put(createContainerId(3), ContainerSubState.RUNNING); + put(createContainerId(4), ContainerSubState.RUNNING); + } + }); + } + + private void setContainerResourceUtilization(ResourceUtilization usage) { + ((ContainerMonitorForOverallocationTest) + containerManager.getContainersMonitor()) + .setContainerResourceUsage(usage); + } + + private void allowContainerToSucceed(int containerId) { + ((LongRunningContainerSimulatingContainerExecutor) this.exec) + .containerSucceeded(createContainerId(containerId)); + } + + /** + * Check if a given container is queued by the container scheduler. + * This is to prevent a race condition where + * attemptToStartOpportunisticContainers() is called before the container + * is queued. + * + * TODO: introduce a new container state, QUEUED to ContainerSubState + */ + private void waitForOpportunisticContainerToBeQueued(ContainerId container) + throws Exception { + BaseContainerManagerTest.waitForContainerSubState(containerManager, + container, ContainerSubState.SCHEDULED); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + OpportunisticContainerSchedulerForTest containerScheduler = + (OpportunisticContainerSchedulerForTest) + containerManager.getContainerScheduler(); + return containerScheduler.isQueued(container); + } + }, 1000, 10000); + } + + + protected StartContainerRequest createStartContainerRequest( + int containerId, Resource resource, boolean isGuaranteed) + throws IOException { + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + ExecutionType executionType = isGuaranteed ? + ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC; + Token containerToken = createContainerToken( + createContainerId(containerId), + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource, + context.getContainerTokenSecretManager(), null, executionType); + + return StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + } + + protected void verifyContainerStatuses( + Map expected) + throws IOException, YarnException { + List statList = new ArrayList<>(expected.keySet()); + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + + for (ContainerStatus status : containerStatuses) { + ContainerId containerId = status.getContainerId(); + Assert.assertEquals(containerId + " is in unexpected state", + expected.get(containerId), status.getContainerSubState()); + } + } + + /** + * A container manager that sends a dummy container pid while it's cleaning + * up running containers. Used along with + * LongRunningContainerSimulatingContainerExecutor to simulate long running + * container processes for testing purposes. + */ + private static class LongRunningContainerSimulatingContainersManager + extends ContainerManagerImpl { + + private final String user; + + LongRunningContainerSimulatingContainersManager( + Context context, ContainerExecutor exec, + DeletionService deletionContext, + NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler, String user) { + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler); + this.user = user; + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + /** + * Create a container launcher that signals container processes + * with a dummy pid. The container processes are simulated in + * LongRunningContainerSimulatingContainerExecutor which does + * not write a pid file on behalf of containers to launch, so + * the pid does not matter. + */ + @Override + protected ContainersLauncher createContainersLauncher( + Context context, ContainerExecutor exec) { + ContainerManagerImpl containerManager = this; + return new ContainersLauncher(context, dispatcher, exec, dirsHandler, + this) { + @Override + protected ContainerLaunch createContainerLaunch( + Application app, Container container) { + return new ContainerLaunch(context, getConfig(), dispatcher, + exec, app, container, dirsHandler, containerManager) { + @Override + protected String getContainerPid(Path pidFilePath) + throws Exception { + return "123"; + } + }; + } + }; + } + + @Override + protected AsyncDispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainerMonitorForOverallocationTest(exec, + dispatcher, context); + } + + public void attemptToStartOpportunisticContainers() { + ((ContainerMonitorForOverallocationTest) getContainersMonitor()) + .attemptToStartContainersUponLowUtilization(); + } + } + + /** + * A container executor that simulates long running container processes + * by having container launch threads sleep infinitely until it's given + * a signal to finish with either a success or failure exit code. + */ + private static class LongRunningContainerSimulatingContainerExecutor + extends DefaultContainerExecutor { + private ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + + public void containerSucceeded(ContainerId containerId) { + ContainerFinishLatch containerFinishLatch = containers.get(containerId); + if (containerFinishLatch != null) { + containerFinishLatch.toSucceed(); + } + } + + public void containerFailed(ContainerId containerId) { + ContainerFinishLatch containerFinishLatch = containers.get(containerId); + if (containerFinishLatch != null) { + containerFinishLatch.toFail(); + } + } + + /** + * Simulate long running container processes by having container launcher + * threads wait infinitely for a signal to finish. + */ + @Override + public int launchContainer(ContainerStartContext ctx) + throws IOException, ConfigurationException { + ContainerId container = ctx.getContainer().getContainerId(); + containers.putIfAbsent(container, new ContainerFinishLatch(container)); + + // simulate a long running container process by having the + // container launch thread sleep forever until it's given a + // signal to finish with a exit code. + while (!containers.get(container).toProceed) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return -1; + } + } + + return containers.get(container).getContainerExitCode(); + } + + /** + * Override signalContainer() so that simulated container processes are + * properly cleaned up. + */ + @Override + public boolean signalContainer(ContainerSignalContext ctx) + throws IOException { + containerSucceeded(ctx.getContainer().getContainerId()); + return true; + } + + /** + * A signal that container launch threads wait for before exiting in order + * to simulate long running container processes. + */ + private static final class ContainerFinishLatch { + volatile boolean toProceed; + int exitCode; + ContainerId container; + + ContainerFinishLatch(ContainerId containerId) { + exitCode = 0; + toProceed = false; + container = containerId; + } + + void toSucceed() { + exitCode = 0; + toProceed = true; + } + + void toFail() { + exitCode = -101; + toProceed = true; + } + + int getContainerExitCode() { + // read barrier of toProceed to make sure the exit code is not stale + if (toProceed) { + LOG.debug(container + " finished with exit code: " + exitCode); + } + return exitCode; + } + } + } + + /** + * A test implementation of container monitor that allows control of current + * resource utilization. + */ + private static class ContainerMonitorForOverallocationTest + extends ContainersMonitorImpl { + + private volatile ResourceUtilization containerResourceUsage = + ResourceUtilization.newInstance(0, 0, 0.0f); + + ContainerMonitorForOverallocationTest(ContainerExecutor exec, + AsyncDispatcher dispatcher, Context context) { + super(exec, dispatcher, context); + } + + @Override + public long getPmemAllocatedForContainers() { + return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return NM_CONTAINERS_VCORES; + } + + @Override + public ResourceUtilization getContainersUtilization() { + return containerResourceUsage; + } + + @Override + protected void checkOverAllocationPrerequisites() throws YarnException { + // do nothing + } + + public void setContainerResourceUsage( + ResourceUtilization containerResourceUsage) { + this.containerResourceUsage = containerResourceUsage; + } + } + + /** + * A test implementation of OpportunisticContainerScheduler that allows us to + * check if an OPPORTUNISTIC container is queued or not. + */ + private static class OpportunisticContainerSchedulerForTest + extends OpportunisticContainerScheduler { + + private final ConcurrentHashSet + queuedOpportunisticContainers = new ConcurrentHashSet<>(); + + public OpportunisticContainerSchedulerForTest( + Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics) { + super(context, dispatcher, metrics); + } + + @Override + protected boolean enqueueOpportunisticContainer(Container container) { + boolean queued = super.enqueueOpportunisticContainer(container); + if (queued) { + queuedOpportunisticContainers.add(container.getContainerId()); + } + return queued; + } + + public boolean isQueued(ContainerId containerId) { + return queuedOpportunisticContainers.contains(containerId); + } + } +}