diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 77ea4fa..2a475ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -234,7 +234,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.dirsHandler = dirsHandler; // ContainerManager level dispatcher. - dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher"); + dispatcher = createDispatcher(); this.deletionService = deletionContext; this.metrics = metrics; @@ -293,6 +293,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.writeLock = lock.writeLock(); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("NM ContainerManager dispatcher"); + } + @Override public void serviceInit(Configuration conf) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 8e9bad8..068056d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -269,7 +269,7 @@ protected void serviceInit(Configuration myConf) throws Exception { /** * Check all prerequisites for NM over-allocation. */ - private void checkOverAllocationPrerequisites() throws YarnException { + protected void checkOverAllocationPrerequisites() throws YarnException { // LinuxContainerExecutor is required to enable overallocation if (!(containerExecutor instanceof LinuxContainerExecutor)) { throw new YarnException(LinuxContainerExecutor.class.getName() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java index 86b3698..a3e19eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -56,11 +56,8 @@ public ResourceUtilization getCurrentUtilization() { return this.containersAllocation; } - /** - * Get the amount of resources that have not been allocated to containers. - * @return Resource resources that have not been allocated to containers. - */ - protected Resource getUnallocatedResources() { + @Override + public Resource getUnallocatedResources() { // unallocated resources = node capacity - containers allocation // = -(container allocation - node capacity) ResourceUtilization allocationClone = @@ -80,7 +77,6 @@ protected Resource getUnallocatedResources() { return unallocated; } - @Override public Resource getAvailableResources() { return getUnallocatedResources(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index f5a86e9..304c2b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -215,7 +215,9 @@ public void handle(ContainerSchedulerEvent event) { startPendingContainers(false); break; case SCHEDULE_CONTAINERS: - startPendingContainers(true); + // try to launch OPPORTUNISTIC containers allowing over-allocation if the + // node utilization is low. + startOpportunisticContainers(utilizationTracker.getAvailableResources()); break; default: LOG.error("Unknown event arrived at ContainerScheduler: " @@ -402,11 +404,7 @@ private void onResourcesReclaimed(Container container) { this.metrics.completeOpportunisticContainer(container.getResource()); } - // In case of over-allocation being turned on, we may need to reclaim - // more resources since the opportunistic containers that have been - // killed or paused may have not released as much resource as we need. - boolean reclaimOpportunisticResources = context.isOverAllocationEnabled(); - startPendingContainers(reclaimOpportunisticResources); + startPendingContainers(false); } } @@ -426,22 +424,32 @@ private void startPendingContainers(boolean reclaimOpportunisticResources) { return; } + boolean guaranteedContainersLaunched = startGuaranteedContainers( + reclaimOpportunisticResources); + + if (guaranteedContainersLaunched) { + // if all GUARANTEED containers are launched, try to launch OPPORTUNISTIC + // containers with unallocated resources only. + startOpportunisticContainers( + utilizationTracker.getUnallocatedResources()); + } + } + + private boolean startGuaranteedContainers( + boolean reclaimOpportunisticResources) { Resource available = utilizationTracker.getAvailableResources(); // Start guaranteed containers that are queued, if resources available. boolean allGuaranteedContainersLaunched = startGuaranteedContainers(available); - // Start opportunistic containers, if resources available, which is true - // if all guaranteed containers in queue have been launched. - if (allGuaranteedContainersLaunched) { - startOpportunisticContainers(available); - } else { + + if (!allGuaranteedContainersLaunched && reclaimOpportunisticResources) { // If not all guaranteed containers in queue are launched, we may need // to reclaim resources from opportunistic containers that are running. - if (reclaimOpportunisticResources) { - reclaimOpportunisticContainerResources(); - } + reclaimOpportunisticContainerResources(); } + + return allGuaranteedContainersLaunched; } /** @@ -685,8 +693,6 @@ private boolean hasSufficientResources( * * If the node is over-allocating itself, this may cause not enough * OPPORTUNISTIC containers being killed/paused in cases where the running - * OPPORTUNISTIC containers are not consuming fully their resource requests. - * We'd check again upon container completion events to see if more running * OPPORTUNISTIC containers need to be killed/paused. * * @return the amount of resource needed to be reclaimed for this container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 98d99c6..7a7c78e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -45,6 +45,12 @@ Resource getAvailableResources(); /** + * Get the amount of resources left un-allocated. + * @return Resource unallocated resources + */ + Resource getUnallocatedResources(); + + /** * Add Container's resources to Node Utilization upon container launch. * @param container Container. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java index 6f9bc82..f976700 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java @@ -61,7 +61,7 @@ public void containerReleased(Container container) { @Override public Resource getAvailableResources() { - Resource resourceBasedOnAllocation = getUnallocatedResources(); + Resource resourceBasedOnAllocation = super.getUnallocatedResources(); Resource resourceBasedOnUtilization = getResourcesAvailableBasedOnUtilization(); if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java index 384b116..183d868 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; @@ -114,6 +115,9 @@ public void setup() throws IOException { conf.setFloat( YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, 0.75f); + // disable the monitor thread in ContainersMonitor to allow control over + // when opportunistic containers are launched with over-allocation + conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false); super.setup(); } @@ -190,6 +194,27 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() BuilderUtils.newResource(512, 1), false)) )); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // this container is not expected to be started immediately because + // opportunistic containers cannot be started if the node would be + // over-allocated + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(2), ContainerSubState.SCHEDULED); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + + // try to start opportunistic containers out of band. + ((LongRunningContainerSimulatingContainersManager) containerManager) + .startContainersOutOfBandUponLowUtilization(); + // this container is expected to be started immediately because there // are (memory: 1024, vcore: 0.625) available based on over-allocation BaseContainerManagerTest.waitForContainerSubState( @@ -244,6 +269,16 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() createStartContainerRequest(2, BuilderUtils.newResource(512, 1), false)) )); + + // try to start opportunistic containers out of band because they can + // not be launched at container scheduler event if the node would be + // over-allocated. + ((LongRunningContainerSimulatingContainersManager) containerManager) + .startContainersOutOfBandUponLowUtilization(); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + // this container will not start immediately because there is not // enough resource available at the moment either in terms of // resources unallocated or in terms of the actual utilization @@ -299,6 +334,27 @@ public void testStartOppContainersWithOverallocationLowUtilization() BuilderUtils.newResource(512, 1), false)) )); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // this container is not expected to be started immediately because + // opportunistic containers cannot be started if the node would be + // over-allocated + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(2), ContainerSubState.SCHEDULED); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + + // try to start opportunistic containers out of band. + ((LongRunningContainerSimulatingContainersManager) containerManager) + .startContainersOutOfBandUponLowUtilization(); + // this container is expected to be started because there is resources // available because the actual utilization is very low BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -355,6 +411,9 @@ public void testQueueOppContainersWithFullUtilization() throws Exception { containerManager.startContainers( StartContainersRequest.newInstance(moreContainerRequests)); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + // All OPPORTUNISTIC containers but the last one should be queued. // The last OPPORTUNISTIC container to launch should be killed. BaseContainerManagerTest.waitForContainerState( @@ -580,13 +639,27 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() } } )); - // All three GUARANTEED containers are all expected to start - // because the containers utilization is low (0 at the point) + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // Two OPPORTUNISTIC containers are expected to start with the + // unallocated resources, but one will be queued because no + // over-allocation is allowed at container scheduler events. BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(0), ContainerSubState.RUNNING); BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(1), ContainerSubState.RUNNING); BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.SCHEDULED); + + // try to start the opportunistic container out of band because it can + // not be launched at container scheduler event if the node would be + // over-allocated. + ((LongRunningContainerSimulatingContainersManager) containerManager) + .startContainersOutOfBandUponLowUtilization(); + + // now the queued opportunistic container should also start + BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.RUNNING); // the containers utilization is low @@ -625,7 +698,6 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() * and fourth OPPORTUNISTIC container (which releases no resources) and * then the second OPPORTUNISTIC container. */ - @Test public void testKillOppContainersConservativelyWithOverallocationHighUtilization() throws Exception { @@ -730,7 +802,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.RUNNING); - // the contianers utilization is at the overallocation threshold + // the container utilization is at the overallocation threshold setContainerResourceUtilization( ResourceUtilization.newInstance(1536, 0, 1.0f/2)); @@ -740,7 +812,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { add(createStartContainerRequest(3, BuilderUtils.newResource(512, 1), false)); add(createStartContainerRequest(4, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(800, 1), false)); } } )); @@ -759,12 +831,35 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.DONE); - // the two OPPORTUNISTIC containers are expected to start together + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // only one OPPORTUNISTIC container is start because no over-allocation + // is allowed to start OPPORTUNISTIC containers at container finish event. BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(3), ContainerSubState.RUNNING); BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(4), ContainerSubState.RUNNING); + createContainerId(4), ContainerSubState.SCHEDULED); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.DONE); + put(createContainerId(3), ContainerSubState.RUNNING); + put(createContainerId(4), ContainerSubState.SCHEDULED); + } + }); + // now try to start the OPPORTUNISTIC container that was queued because + // we don't start OPPORTUNISTIC containers at container finish event if + // the node would be over-allocated + ((LongRunningContainerSimulatingContainersManager) containerManager) + .startContainersOutOfBandUponLowUtilization(); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(4), ContainerSubState.RUNNING); verifyContainerStatuses(new HashMap() { { put(createContainerId(0), ContainerSubState.RUNNING); @@ -817,7 +912,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { setContainerResourceUtilization( ResourceUtilization.newInstance(1536, 0, 1.0f/2)); - // try to start containers out of band. + // try to start opportunistic containers out of band. ((LongRunningContainerSimulatingContainersManager)containerManager) .startContainersOutOfBandUponLowUtilization(); @@ -937,6 +1032,11 @@ protected UserGroupInformation getRemoteUgi() throws YarnException { return ugi; } + @Override + protected AsyncDispatcher createDispatcher() { + return new DrainDispatcher(); + } + /** * Create a container launcher that signals container processes * with a dummy pid. The container processes are simulated in @@ -977,6 +1077,10 @@ public void startContainersOutOfBandUponLowUtilization() { ((ContainerMonitorForOverallocationTest) getContainersMonitor()) .attemptToStartContainersUponLowUtilization(); } + + public void drainAsyncEvents() { + ((DrainDispatcher) dispatcher).await(); + } } /** @@ -1113,6 +1217,12 @@ public ContainersResourceUtilization getContainersUtilization( containerResourceUsage, System.currentTimeMillis()); } + @Override + protected void checkOverAllocationPrerequisites() { + // do not check + } + + public void setContainerResourceUsage( ResourceUtilization containerResourceUsage) { this.containerResourceUsage = containerResourceUsage;