fStates = new HashSet<>(finalStates);
+ int timeoutSecs = 0;
+ do {
+ Thread.sleep(1000);
+ containerStatus =
+ containerManager.getContainerStatuses(request)
+ .getContainerStatuses().get(0);
+ LOG.info("Waiting for container to get into one of states " + fStates
+ + ". Current state is " + containerStatus.getContainerSubState());
+ timeoutSecs += 1;
+ } while (!fStates.contains(containerStatus.getContainerSubState())
+ && timeoutSecs < timeOutMax);
+ LOG.info("Container state is " + containerStatus.getContainerSubState());
+ Assert.assertTrue("ContainerSubState is not correct (timedout)",
+ fStates.contains(containerStatus.getContainerSubState()));
+ }
+
+
+
public static void waitForApplicationState(
ContainerManagerImpl containerManager, ApplicationId appID,
ApplicationState finalState)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 6d198a4..e59ba2c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -108,6 +108,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.DefaultContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -203,6 +205,12 @@ protected UserGroupInformation getRemoteUgi() throws YarnException {
.getKeyId()));
return ugi;
}
+
+ @Override
+ protected AbstractContainerScheduler createContainerScheduler(
+ Context context) {
+ return new DefaultContainerScheduler(context, dispatcher, metrics);
+ }
};
}
@@ -538,10 +546,12 @@ private String doRestartTests(ContainerId cId, File oldStartFile,
ResourceUtilization beforeUpgrade =
ResourceUtilization.newInstance(
containerManager.getContainerScheduler().getCurrentUtilization());
+ LOG.info("Before upgrade utilization: " + beforeUpgrade);
prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile);
ResourceUtilization afterUpgrade =
ResourceUtilization.newInstance(
containerManager.getContainerScheduler().getCurrentUtilization());
+ LOG.info("After upgrade utilization: " + afterUpgrade);
Assert.assertEquals("Possible resource leak detected !!",
beforeUpgrade, afterUpgrade);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index cad835c..a97fcf2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -99,7 +99,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.DefaultContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
@@ -619,7 +620,7 @@ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
.containermanager.container.ContainerState.RUNNING);
}
- private ContainerManagerImpl createContainerManager(Context context,
+ protected ContainerManagerImpl createContainerManager(Context context,
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
mock(NodeStatusUpdater.class), metrics, dirsHandler) {
@@ -633,8 +634,9 @@ protected void authorizeGetAndStopContainerRequest(
}
}
@Override
- protected ContainerScheduler createContainerScheduler(Context context) {
- return new ContainerScheduler(context, dispatcher, metrics){
+ protected AbstractContainerScheduler createContainerScheduler(
+ Context context) {
+ return new DefaultContainerScheduler(context, dispatcher, metrics){
@Override
public ContainersMonitor getContainersMonitor() {
return new ContainersMonitorImpl(null, null, null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java
new file mode 100644
index 0000000..a734d01
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecoveryWithOpportunisticContainerScheduler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.OpportunisticContainerScheduler;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test ContainerManager recovery when Opportunistic Container Scheduler
+ * is enabled.
+ */
+public class TestContainerManagerRecoveryWithOpportunisticContainerScheduler
+ extends TestContainerManagerRecovery {
+ public TestContainerManagerRecoveryWithOpportunisticContainerScheduler()
+ throws UnsupportedFileSystemException {
+ }
+
+ protected ContainerManagerImpl createContainerManager(
+ Context context, DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc,
+ mock(NodeStatusUpdater.class), metrics, dirsHandler) {
+ @Override
+ protected void authorizeGetAndStopContainerRequest(
+ ContainerId containerId, Container container,
+ boolean stopRequest, NMTokenIdentifier identifier)
+ throws YarnException {
+ if(container == null || container.getUser().equals("Fail")){
+ throw new YarnException("Reject this container");
+ }
+ }
+ @Override
+ protected AbstractContainerScheduler createContainerScheduler(
+ Context context) {
+ return new OpportunisticContainerScheduler(
+ context, dispatcher, metrics) {
+ @Override
+ public ContainersMonitor getContainersMonitor() {
+ return new ContainersMonitorImpl(null, null, null) {
+ @Override
+ public float getVmemRatio() {
+ return 2.0f;
+ }
+
+ @Override
+ public long getVmemAllocatedForContainers() {
+ return 20480;
+ }
+
+ @Override
+ public long getPmemAllocatedForContainers() {
+ return (long) 2048 << 20;
+ }
+
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return 4;
+ }
+ };
+ }
+ };
+ }
+ };
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java
new file mode 100644
index 0000000..8f1cde5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerWithOpportunisticContainerScheduler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.AbstractContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.OpportunisticContainerScheduler;
+
+/**
+ * Test {@link ContainerManagerImpl} with
+ * {@link OpportunisticContainerScheduler}.
+ */
+public class TestContainerManagerWithOpportunisticContainerScheduler
+ extends TestContainerManager {
+ public TestContainerManagerWithOpportunisticContainerScheduler()
+ throws UnsupportedFileSystemException {
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(
+ DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+ metrics, dirsHandler) {
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user,
+ context.getNMTokenSecretManager().getCurrentKey().getKeyId()));
+ return ugi;
+ }
+
+ @Override
+ protected AbstractContainerScheduler createContainerScheduler(
+ Context context) {
+ return new OpportunisticContainerScheduler(
+ context, dispatcher, metrics);
+ }
+ };
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 1a263ee..680f340 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -98,7 +98,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.DefaultContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -1325,7 +1325,7 @@ public boolean matches(Object o) {
appBus = mock(EventHandler.class);
LogBus = mock(EventHandler.class);
delService = mock(DeletionService.class);
- schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) {
+ schedBus = new DefaultContainerScheduler(context, dispatcher, metrics, 0) {
@Override
protected void scheduleContainer(Container container) {
container.sendLaunchEvent();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
index 82c2147..07f32ee 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java
@@ -37,11 +37,11 @@
*/
public class TestAllocationBasedResourceUtilizationTracker {
- private ContainerScheduler mockContainerScheduler;
+ private AbstractContainerScheduler mockContainerScheduler;
@Before
public void setup() {
- mockContainerScheduler = mock(ContainerScheduler.class);
+ mockContainerScheduler = mock(AbstractContainerScheduler.class);
ContainersMonitor containersMonitor =
new ContainersMonitorImpl(mock(ContainerExecutor.class),
mock(AsyncDispatcher.class), mock(Context.class));
@@ -68,7 +68,7 @@ public void testHasResourcesAvailable() {
when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4));
for (int i = 0; i < 2; i++) {
Assert.assertTrue(tracker.hasResourcesAvailable(testContainer));
- tracker.addContainerResources(testContainer);
+ tracker.containerLaunched(testContainer);
}
Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerBehaviorCompatibility.java
similarity index 92%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerBehaviorCompatibility.java
index 5b99285..4db487e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerBehaviorCompatibility.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerBehaviorCompatibility.java
@@ -36,12 +36,13 @@
import java.util.List;
/**
- * Make sure ContainerScheduler related changes are compatible
+ * Make sure DefaultContainerScheduler related changes are compatible
* with old behavior.
*/
-public class TestContainerSchedulerBehaviorCompatibility
+public class TestDefaultContainerSchedulerBehaviorCompatibility
extends BaseContainerManagerTest {
- public TestContainerSchedulerBehaviorCompatibility()
+
+ public TestDefaultContainerSchedulerBehaviorCompatibility()
throws UnsupportedFileSystemException {
super();
}
@@ -78,7 +79,8 @@ public void testForceStartGuaranteedContainersWhenOppContainerDisabled()
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
- ContainerScheduler cs = containerManager.getContainerScheduler();
+ DefaultContainerScheduler cs =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
int nQueuedContainers = cs.getNumQueuedContainers();
int nRunningContainers = cs.getNumRunningContainers();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerQueuing.java
similarity index 97%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerQueuing.java
index 5c72e7e..c6f2f05 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerQueuing.java
@@ -75,16 +75,16 @@
import static org.mockito.Mockito.spy;
/**
- * Tests to verify that the {@link ContainerScheduler} is able to queue and
+ * Tests to verify that the {@link DefaultContainerScheduler} is able to queue and
* make room for containers.
*/
-public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
- public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
+public class TestDefaultContainerSchedulerQueuing extends BaseContainerManagerTest {
+ public TestDefaultContainerSchedulerQueuing() throws UnsupportedFileSystemException {
super();
}
static {
- LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
+ LOG = LoggerFactory.getLogger(TestDefaultContainerSchedulerQueuing.class);
}
private static class Listener implements ContainerStateTransitionListener {
@@ -166,6 +166,12 @@ public long getVCoresAllocatedForContainers() {
}
};
}
+
+ @Override
+ protected AbstractContainerScheduler createContainerScheduler(
+ Context context) {
+ return new DefaultContainerScheduler(context, dispatcher, metrics);
+ }
};
}
@@ -319,8 +325,8 @@ public void testQueueMultipleContainers() throws Exception {
status.getContainerSubState());
}
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
// Ensure both containers are properly queued.
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(1,
@@ -389,8 +395,8 @@ public void testStartAndQueueMultipleContainers() throws Exception {
}
}
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
// Ensure two containers are properly queued.
Assert.assertEquals(2, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
@@ -472,8 +478,8 @@ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
System.out.println("\nStatus : [" + status + "]\n");
}
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
Assert.assertEquals(maxOppQueueLength,
containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
@@ -578,7 +584,7 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception {
@Test
public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
containerManager.start();
- containerManager.getContainerScheduler().
+ ((DefaultContainerScheduler) containerManager.getContainerScheduler()).
setUsePauseEventForPreemption(true);
Listener listener = new Listener();
@@ -758,8 +764,8 @@ public void testQueueShedding() throws Exception {
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
// Ensure all containers are properly queued.
int numTries = 30;
while ((containerScheduler.getNumQueuedContainers() < 6) &&
@@ -771,7 +777,8 @@ public void testQueueShedding() throws Exception {
ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
.newInstance();
containerQueuingLimit.setMaxQueueLength(2);
- containerScheduler.updateQueuingLimit(containerQueuingLimit);
+ containerScheduler.updateOpportunisticContainerQueuingLimit(
+ containerQueuingLimit);
numTries = 30;
while ((containerScheduler.getNumQueuedContainers() > 2) &&
(numTries-- > 0)) {
@@ -852,8 +859,8 @@ public void testContainerDeQueuedAfterAMKill() throws Exception {
allRequests = StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
// Ensure both containers are properly queued.
int numTries = 30;
while ((containerScheduler.getNumQueuedContainers() < 2) &&
@@ -1181,8 +1188,8 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
}
}
- ContainerScheduler containerScheduler =
- containerManager.getContainerScheduler();
+ DefaultContainerScheduler containerScheduler =
+ (DefaultContainerScheduler) containerManager.getContainerScheduler();
// Ensure two containers are properly queued.
Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerRecovery.java
similarity index 94%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerRecovery.java
index 2ae8b97..cc3bf85 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestDefaultContainerSchedulerRecovery.java
@@ -41,11 +41,11 @@
import org.mockito.MockitoAnnotations;
/**
- * Tests to verify that the {@link ContainerScheduler} is able to
+ * Tests to verify that the {@link DefaultContainerScheduler} is able to
* recover active containers based on RecoveredContainerStatus and
* ExecutionType.
*/
-public class TestContainerSchedulerRecovery {
+public class TestDefaultContainerSchedulerRecovery {
@Mock private NMContext context;
@@ -66,10 +66,10 @@
@Mock private AllocationBasedResourceUtilizationTracker
allocationBasedResourceUtilizationTracker;
- @InjectMocks private ContainerScheduler tempContainerScheduler =
- new ContainerScheduler(context, dispatcher, metrics, 0);
+ @InjectMocks private DefaultContainerScheduler tempContainerScheduler =
+ new DefaultContainerScheduler(context, dispatcher, metrics, 0);
- private ContainerScheduler spy;
+ private DefaultContainerScheduler spy;
@Before public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
@@ -80,7 +80,7 @@
.thenReturn(appId);
when(containerId.getContainerId()).thenReturn(123L);
doNothing().when(allocationBasedResourceUtilizationTracker)
- .addContainerResources(container);
+ .containerLaunched(container);
}
@After public void tearDown() {
@@ -102,7 +102,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
@@ -121,7 +121,7 @@
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED, GUARANTEED,
@@ -140,7 +140,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
@@ -159,7 +159,7 @@
assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as LAUNCHED, GUARANTEED,
@@ -178,7 +178,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
@@ -197,7 +197,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as REQUESTED, GUARANTEED,
@@ -216,7 +216,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
@@ -235,7 +235,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as COMPLETED, GUARANTEED,
@@ -254,7 +254,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
@@ -273,7 +273,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as GUARANTEED but no executionType set,
@@ -291,7 +291,7 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
/*Test if a container is recovered as PAUSED but no executionType set,
@@ -309,6 +309,6 @@
assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
- .addContainerResources(container);
+ .containerLaunched(container);
}
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java
new file mode 100644
index 0000000..d95243d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerRecovery.java
@@ -0,0 +1,300 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
+ .RecoveredContainerStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests to verify that the {@link OpportunisticContainerScheduler} is
+ * able to recover active containers based on RecoveredContainerStatus
+ * and ExecutionType.
+ */
+public class TestOpportunisticContainerRecovery {
+ @Mock
+ private NMContext context;
+
+ @Mock private NodeManagerMetrics metrics;
+
+ @Mock private AsyncDispatcher dispatcher;
+
+ @Mock private ContainerTokenIdentifier token;
+
+ @Mock private ContainerImpl container;
+
+ @Mock private ApplicationId appId;
+
+ @Mock private ApplicationAttemptId appAttemptId;
+
+ @Mock private ContainerId containerId;
+
+ @Mock private AllocationBasedResourceUtilizationTracker
+ allocationBasedResourceUtilizationTracker;
+
+ @InjectMocks
+ private OpportunisticContainerScheduler containerScheduler =
+ new OpportunisticContainerScheduler(context, dispatcher, metrics);
+
+ private OpportunisticContainerScheduler spy;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ spy = spy(containerScheduler);
+
+ when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(containerId.getApplicationAttemptId().getApplicationId())
+ .thenReturn(appId);
+ when(containerId.getContainerId()).thenReturn(123L);
+ when(container.getContainerId()).thenReturn(containerId);
+ doNothing().when(allocationBasedResourceUtilizationTracker)
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a QUEUED GUARANTEED container.
+ * it should be launched immediately.
+ */
+ @Test
+ public void testRecoverContainerQueuedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ verify(allocationBasedResourceUtilizationTracker, times(1))
+ .containerLaunched(container);
+ verify(container, times(1)).sendLaunchEvent();
+ assertEquals(1, spy.getNumberOfRunningContainers());
+ }
+
+ /**
+ * Test the recovery of a QUEUED OPPORTUNISTIC container.
+ * it should be queued.
+ */
+ @Test public void testRecoverContainerQueuedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ OpportunisticContainersStatus status =
+ containerScheduler.getOpportunisticContainersStatus();
+ assertEquals(status.getQueuedOpportContainers(), 1);
+ assertEquals(status.getRunningOpportContainers(), 0);
+ assertEquals(0, spy.getNumberOfRunningContainers());
+ }
+
+ /**
+ * Test the recovery of a LAUNCHED GUARANTEED container.
+ */
+ @Test public void testRecoverContainerLaunchedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ verify(allocationBasedResourceUtilizationTracker, times(1))
+ .containerLaunched(container);
+ assertEquals(1, spy.getNumberOfRunningContainers());
+ }
+
+ /**
+ * Test the recovery of a LAUNCHED OPPORTUNISTIC container.
+ */
+ @Test public void testRecoverContainerLaunchedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ verify(allocationBasedResourceUtilizationTracker, times(1))
+ .containerLaunched(container);
+ assertEquals(1, spy.getNumberOfRunningContainers());
+ }
+
+ /**
+ * Test the recovery of a REQUESTED GUARANTEED container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerRequestedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ assertEquals(0, spy.getNumberOfRunningContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a REQUESTED OPPORTUNISTIC container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerRequestedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ OpportunisticContainersStatus status =
+ spy.getOpportunisticContainersStatus();
+ assertEquals(0, status.getRunningOpportContainers());
+ assertEquals(0, status.getQueuedOpportContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a PAUSED GUARANTEED container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerPausedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ assertEquals(0, spy.getNumberOfRunningContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a PAUSED OPPORTUNISTIC container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerPausedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ OpportunisticContainersStatus status =
+ spy.getOpportunisticContainersStatus();
+ assertEquals(0, status.getRunningOpportContainers());
+ assertEquals(0, status.getQueuedOpportContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a COMPLETED GUARANTEED container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerCompletedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ assertEquals(0, spy.getNumberOfRunningContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a COMPLETED OPPORTUNISTIC container.
+ * It should be ignored.
+ */
+ @Test public void testRecoverContainerCompletedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+ when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ OpportunisticContainersStatus status =
+ spy.getOpportunisticContainersStatus();
+ assertEquals(0, status.getRunningOpportContainers());
+ assertEquals(0, status.getQueuedOpportContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a QUEUED container without any ExecutionType.
+ * It should be ignored.
+ */
+ @Test public void testContainerQueuedNoExecType()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ OpportunisticContainersStatus status =
+ spy.getOpportunisticContainersStatus();
+ assertEquals(0, status.getRunningOpportContainers());
+ assertEquals(0, status.getQueuedOpportContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+
+ /**
+ * Test the recovery of a QUEUED container without any ExecutionType.
+ * It should be ignored.
+ */
+ @Test public void testContainerPausedNoExecType()
+ throws IllegalArgumentException, IllegalAccessException {
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ when(container.getContainerTokenIdentifier()).thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+
+ OpportunisticContainersStatus status =
+ spy.getOpportunisticContainersStatus();
+ assertEquals(0, status.getRunningOpportContainers());
+ assertEquals(0, status.getQueuedOpportContainers());
+ verify(allocationBasedResourceUtilizationTracker, times(0))
+ .containerLaunched(container);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java
new file mode 100644
index 0000000..403f361
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestOpportunisticContainerScheduler.java
@@ -0,0 +1,1297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerSubState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.exceptions.ConfigurationException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Unit tests for {@link OpportunisticContainerScheduler}.
+ */
+public class TestOpportunisticContainerScheduler
+ extends BaseContainerManagerTest {
+
+ private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3;
+ private static final int NM_CONTAINERS_VCORES = 4;
+ private static final int NM_CONTAINERS_MEMORY_MB = 2048;
+
+ static {
+ LOG = LoggerFactory.getLogger(TestOpportunisticContainerScheduler.class);
+ }
+
+ public TestOpportunisticContainerScheduler()
+ throws UnsupportedFileSystemException {
+ }
+
+ @Override
+ protected ContainerExecutor createContainerExecutor() {
+ DefaultContainerExecutor exec =
+ new LongRunningContainerSimulatingContainerExecutor();
+ exec.setConf(conf);
+ return exec;
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(
+ DeletionService delSrvc) {
+ return new LongRunningContainerSimulatingContainersManager(
+ context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user) {
+ // use OpportunisticContainerScheduler
+ @Override
+ protected AbstractContainerScheduler
+ createContainerScheduler(Context cntxt) {
+ return new OpportunisticContainerSchedulerForTest(
+ context, dispatcher, metrics);
+ }
+ };
+ }
+
+ @Override
+ public void setup() throws IOException {
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+ NM_OPPORTUNISTIC_QUEUE_LIMIT);
+ conf.setFloat(
+ YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD,
+ 0.75f);
+ conf.setFloat(
+ YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD,
+ 0.75f);
+ // disable container monitor thread
+ conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false);
+
+ super.setup();
+ }
+
+ /**
+ * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones.
+ * Try killing one of the two queued containers.
+ * @throws Exception
+ */
+ @Test
+ public void testStopQueuedContainer() throws Exception {
+ containerManager.start();
+
+ StartContainersRequest allRequests = StartContainersRequest.newInstance(
+ new ArrayList() { {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(2048, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false));
+ } }
+ );
+ containerManager.startContainers(allRequests);
+
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+
+ // Assert there is initially one container running and two queued.
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.SCHEDULED);
+ put(createContainerId(2), ContainerSubState.SCHEDULED);
+ }
+ });
+
+ // Stop one of the two queued containers.
+ StopContainersRequest stopRequest = StopContainersRequest.
+ newInstance(Arrays.asList(createContainerId(1)));
+ containerManager.stopContainers(stopRequest);
+
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.DONE);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.DONE);
+ put(createContainerId(2), ContainerSubState.SCHEDULED);
+ }
+ });
+ }
+
+ /**
+ * 1. Submit a long running GUARANTEED container to hog all NM resources.
+ * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
+ * 3. Update the Queue Limit to 2.
+ * 4. Ensure only 2 containers remain in the Queue, and 4 are de-Queued.
+ * @throws Exception
+ */
+ @Test
+ public void testQueueShedding() throws Exception {
+ containerManager.start();
+
+ // start a long running GUARANTEED container to hog all NM resources.
+ StartContainersRequest allRequests = StartContainersRequest.newInstance(
+ new ArrayList() { {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(2048, 1), true));
+ } }
+ );
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(2048, 0, 0.25f));
+
+ // try to start 3 OPPORTUNISTIC containers, all of which will be queued.
+ allRequests = StartContainersRequest.newInstance(
+ new ArrayList() { {
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(3,
+ BuilderUtils.newResource(512, 1), false));
+ } }
+ );
+ containerManager.startContainers(allRequests);
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+ waitForOpportunisticContainerToBeQueued(createContainerId(3));
+
+ // update the queue limit to 2
+ ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit
+ .newInstance();
+ containerQueuingLimit.setMaxQueueLength(2);
+ containerManager.getContainerScheduler()
+ .updateOpportunisticContainerQueuingLimit(containerQueuingLimit);
+
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(3), ContainerSubState.DONE);
+
+ // check if four GUARANTEED containers are dequeued
+ List statList = new ArrayList<>();
+ for (int i = 1; i < 4; i++) {
+ statList.add(createContainerId(i));
+ }
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+
+ int deQueuedContainers = 0;
+ int numQueuedOppContainers = 0;
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ if (status.getDiagnostics().contains(
+ "Container De-queued to meet NM queuing limits")) {
+ deQueuedContainers++;
+ }
+ if (ContainerSubState.SCHEDULED == status.getContainerSubState()) {
+ numQueuedOppContainers++;
+ }
+ }
+ }
+ Assert.assertEquals(1, deQueuedContainers);
+ Assert.assertEquals(2, numQueuedOppContainers);
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate
+ * do not exceed the capacity of the node. The GUARANTEED containers is
+ * expected to start running immediately, the OPPORTUNISTIC container is
+ * expected to start running after out-of-band container scheduling check is
+ * done.
+ */
+ @Test
+ public void testStartMultipleContainersWithoutOverallocation()
+ throws Exception {
+ containerManager.start();
+
+ StartContainersRequest allRequests = StartContainersRequest.newInstance(
+ new ArrayList() { {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ } }
+ );
+ containerManager.startContainers(allRequests);
+
+ // OPPORTUNISTIC containers are always queued first
+ waitForOpportunisticContainerToBeQueued(createContainerId(0));
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.SCHEDULED);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ }
+ });
+
+ // the GUARANTEED container is fully utilizing its resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1024, 0, 0.5f));
+ // start the OPPORTUNISTIC container in an out-of-band fashion
+ ((LongRunningContainerSimulatingContainersManager)containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the OPPORTUNISTIC container is expected to be launched and start running
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization
+ * is very low relative to their resource request, resulting in a low node
+ * utilization. Then start another OPPORTUNISTIC containers which requests
+ * more than what's left unallocated on the node. Due to overallocation
+ * being turned on and node utilization being low, the second OPPORTUNISTIC
+ * container is also expected to be launched.
+ */
+ @Test
+ public void testStartOppContainersWithPartialOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+
+ // the GUARANTEED container is expected to start running and
+ // the OPPORTUNISTIC is supposed to be queued
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ // the GUARANTEED container's utilization is very low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(256, 0, 1.0f/8));
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the OPPORTUNISTIC container is expected to be running
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(512, 0, 1.0f/6));
+
+ // start a container that requests more than what's left unallocated
+ // 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // this OPPORTUNISTIC container is expected to be launched because there are
+ // (memory: 1024, vcore: 0.625) available based on over-allocation threshold
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most
+ * of the resources they requested, resulting in a high node utilization.
+ * Then start another OPPORTUNISTIC containers which requests more than what's
+ * left unallocated on the node. Because of the high resource utilization on
+ * the node, the projected utilization, if we were to start the second
+ * OPPORTUNISTIC container, will go over the NM overallocation
+ * threshold, so the second OPPORTUNISTIC container is expected to be queued.
+ */
+ @Test
+ public void testQueueOppContainerWithPartialOverallocationHighUtilization()
+ throws Exception {
+ containerManager.start();
+
+ // try to start one GUARANTEED, one OPPORTUNISTIC container
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+
+ // the GUARANTEED container is expected to start running,
+ // whereas the OPPORTUNISTIC one is supposed to be queued.
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ // try to launch the OPPORTUNISTIC container
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1024, 0, 1.0f/8));
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+ // the OPPORTUNISTIC container is expected to be running
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(
+ containerManager, createContainerId(1), ContainerSubState.RUNNING);
+
+ // the aggregate utilization of the two containers is very high
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1500, 0, 1.0f/6));
+
+ // try to start a container that requests more than what's left unallocated
+ // 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // this container will not start running because there is not
+ // enough resource available at the moment either in terms of
+ // resources unallocated or in terms of the actual utilization
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.SCHEDULED);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.SCHEDULED);
+ }
+ });
+ }
+
+ /**
+ * Start two GUARANTEED containers which in aggregate takes up the whole node
+ * capacity, yet whose utilization is low relative to their resource request,
+ * resulting in a low node resource utilization. Then try to start another
+ * OPPORTUNISTIC containers. Because the resource utilization across the node
+ * is low and over-allocation being turned on, the OPPORTUNISTIC container is
+ * expected to be launched even though there is no resources left unallocated.
+ */
+ @Test
+ public void testStartOppContainersWithOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ // try to start two GUARANTEED containers that takes up the whole node
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+ // both GUARANTEED containers are expected to be running immediately
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the current utilization of the two GUARANTEED containers is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(800, 0, 1.0f/8));
+
+ // try to start a OPPORTUNISTIC container when there is no resources left
+ // unallocated.
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), false))
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the OPPORTUNISTIC container is expected to be started because there
+ // are resources available since the actual utilization is very low
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+
+ /**
+ * Start two GUARANTEED containers which in aggregate take up the whole node
+ * capacity and fully utilize the resources they requested. Then try to start
+ * four OPPORTUNISTIC containers of which three will be queued and one will be
+ * killed because of the max queue length is 3.
+ */
+ @Test
+ public void testQueueOppContainersWithFullUtilization() throws Exception {
+ containerManager.start();
+
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the containers are fully utilizing their resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(2048, 0, 1.0f/8));
+
+ // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container
+ // queue can hold when there is no unallocated resource left.
+ List moreContainerRequests =
+ new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1);
+ for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) {
+ moreContainerRequests.add(
+ createStartContainerRequest(2 + a,
+ BuilderUtils.newResource(512, 1), false));
+ }
+ containerManager.startContainers(
+ StartContainersRequest.newInstance(moreContainerRequests));
+ for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT; a++) {
+ waitForOpportunisticContainerToBeQueued(createContainerId(2 + a));
+ }
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // All OPPORTUNISTIC containers but the last one should be queued.
+ // The last OPPORTUNISTIC container to launch should be killed.
+ BaseContainerManagerTest.waitForContainerState(
+ containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2),
+ ContainerState.COMPLETE);
+
+ HashMap expectedContainerStatus =
+ new HashMap<>();
+ expectedContainerStatus.put(
+ createContainerId(0), ContainerSubState.RUNNING);
+ expectedContainerStatus.put(
+ createContainerId(1), ContainerSubState.RUNNING);
+ expectedContainerStatus.put(
+ createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT),
+ ContainerSubState.DONE);
+ for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) {
+ expectedContainerStatus.put(
+ createContainerId(i + 2), ContainerSubState.SCHEDULED);
+ }
+ verifyContainerStatuses(expectedContainerStatus);
+ }
+
+ /**
+ * Start two GUARANTEED containers that together does not take up the
+ * whole node. Then try to start one OPPORTUNISTIC container that will
+ * fit into the remaining unallocated space on the node.
+ * The OPPORTUNISTIC container is expected to start even though the
+ * current node utilization is above the NM overallocation threshold,
+ * because it's always safe to launch containers as long as the node
+ * has not been fully allocated.
+ */
+ @Test
+ public void testStartOppContainerWithHighUtilizationNoOverallocation()
+ throws Exception {
+ containerManager.start();
+
+ // try to start two GUARANTEED containers
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1200, 1), true));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(400, 1), true));
+ }
+ }
+ ));
+ // both containers are expected to be running
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the containers utilization is above the over-allocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1600, 0, 1.0f/2));
+
+ // try to start an OPPORTUNISTIC container that can just fit in the
+ // remaining unallocated space
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(400, 1), false))
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the OPPORTUNISTIC container can be safely launched even though
+ // the container utilization is above the NM overallocation threshold
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers first whose utilization is low relative
+ * to the resources they requested, resulting in a low node utilization. Then
+ * try to start a GUARANTEED container which requests more than what's left
+ * unallocated on the node. Because the node utilization is low and NM
+ * overallocation is turned on, the GUARANTEED container is expected to be
+ * started immediately without killing any running OPPORTUNISTIC containers.
+ *
+ * TODO: add preemption check when YARN-6672 is done
+ */
+ @Test
+ public void testKillNoOppContainersWithPartialOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ // try to start two OPPORTUNISTIC containers
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(0));
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // both OPPORTUNISTIC containers are expected to be running
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the current aggregate containers utilization is low
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(512, 0, 1.0f/8));
+
+ // start a GUARANTEED container that requests more than what's left
+ // unallocated on the node: (512 + 1024 + 824) > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ // the GUARANTEED container is expected be launched immediately
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers whose utilization will be high relative
+ * to the resources they requested, resulting in a high node utilization.
+ * Then try to start a GUARANTEED container which requests more than what's
+ * left unallocated on the node. Because the node is under high utilization,
+ * the second OPPORTUNISTIC container is expected to be killed in order to
+ * make room for the GUARANTEED container.
+ *
+ * TODO: add preemption check when YARN-6672 is done
+ */
+ @Test
+ public void testKillNoOppContainersWithPartialOverallocationHighUtilization()
+ throws Exception {
+ containerManager.start();
+
+ // try to start two OPPORTUNISTIC containers
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(824, 1), false));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(0));
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // both OPPORTUNISTIC containers are expected to be running
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // the current aggregate containers utilization is high
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1800, 0, 1.0f/8));
+
+ // try to start a GUARANTEED container that requests more than what's left
+ // unallocated on the node 512 + 1024 + 824 > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(2,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ // the GUARANTEED container is expected be launched immediately
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ // TODO when preemption is added, verify enough OPPORTUNSTIC containers
+ // are killed.
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+
+ /**
+ * Start three OPPORTUNISTIC containers which in aggregates exceeds the
+ * capacity of the node, yet whose utilization is low relative
+ * to the resources they requested, resulting in a low node utilization.
+ * Then try to start a GUARANTEED container. Even though the node has
+ * nothing left unallocated, it is expected to start immediately
+ * without killing any running OPPORTUNISTIC containers because the node
+ * utilization is very low and overallocation is turned on.
+ *
+ * TODO: add preemption check when YARN-6672 is done
+ */
+ @Test
+ public void testKillNoOppContainersWithOverallocationLowUtilization()
+ throws Exception {
+ containerManager.start();
+
+ // try to start two OPPORTUNISTIC containers
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(1024, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(1024, 1), false));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(0));
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // both OPPORTUNISTIC containers are expected to start running
+ // because the containers utilization is low (0 at the point)
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+
+ // try to start another OPPORTUNISTIC container
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(1024, 1), false));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(2));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the OPPORTUNISTIC container is also expected to start running
+ // because the aggregate containers utilization is still 0
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ // the current aggregate containers utilization is low after launching
+ // three OPPORTUNISTIC containers
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1024, 0, 1.0f/8));
+
+ // try to start a GUARANTEED container that requests more than what's left
+ // unallocated on the node: (512 + 1024 + 824) > 2048
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ Collections.singletonList(
+ createStartContainerRequest(3,
+ BuilderUtils.newResource(512, 1), true))
+ ));
+
+ // the GUARANTEED container is expected be launched immediately without
+ // killing any OPPORTUNISTIC containers
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(3), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.RUNNING);
+ put(createContainerId(3), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ /**
+ * Start two OPPORTUNISTIC containers followed by one GUARANTEED container,
+ * which in aggregate exceeds the capacity of the node. The first two
+ * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED
+ * one utilizes nearly all of its resource requested. Then try to start two
+ * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are
+ * expected to be queued immediately. Upon the completion of the
+ * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers
+ * are expected to start.
+ */
+ @Test
+ public void testStartOppContainersUponContainerCompletion() throws Exception {
+ containerManager.start();
+
+ // try to start two OPPORTUNISTIC containers and one GUARANTEED container
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(0,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(1,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(2,
+ BuilderUtils.newResource(1024, 1), true));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(0));
+ waitForOpportunisticContainerToBeQueued(createContainerId(1));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // All three containers are expected to start immediately
+ // because the node utilization is low (0 at the point)
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(0), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(1), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.RUNNING);
+
+ // the aggregate containers utilization is at the overallocation threshold
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(1536, 0, 1.0f/2));
+
+ // try to start two OPPORTUNISTIC containers
+ containerManager.startContainers(StartContainersRequest.newInstance(
+ new ArrayList() {
+ {
+ add(createStartContainerRequest(3,
+ BuilderUtils.newResource(512, 1), false));
+ add(createStartContainerRequest(4,
+ BuilderUtils.newResource(512, 1), false));
+ }
+ }
+ ));
+ waitForOpportunisticContainerToBeQueued(createContainerId(3));
+ waitForOpportunisticContainerToBeQueued(createContainerId(4));
+
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+
+ // the two new OPPORTUNISTIC containers are expected to be queued
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(3), ContainerSubState.SCHEDULED);
+ put(createContainerId(4), ContainerSubState.SCHEDULED);
+ }
+ });
+
+ // the GUARANTEED container is completed releasing resources
+ setContainerResourceUtilization(
+ ResourceUtilization.newInstance(100, 0, 1.0f/5));
+ allowContainerToSucceed(2);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(2), ContainerSubState.DONE);
+
+ // check again to see if the queue OPPORTUNISTIC containers can be launched
+ ((LongRunningContainerSimulatingContainersManager) containerManager)
+ .attemptToStartOpportunisticContainers();
+ // the two OPPORTUNISTIC containers are expected to start together
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(3), ContainerSubState.RUNNING);
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ createContainerId(4), ContainerSubState.RUNNING);
+
+ verifyContainerStatuses(new HashMap() {
+ {
+ put(createContainerId(0), ContainerSubState.RUNNING);
+ put(createContainerId(1), ContainerSubState.RUNNING);
+ put(createContainerId(2), ContainerSubState.DONE);
+ put(createContainerId(3), ContainerSubState.RUNNING);
+ put(createContainerId(4), ContainerSubState.RUNNING);
+ }
+ });
+ }
+
+ private void setContainerResourceUtilization(ResourceUtilization usage) {
+ ((ContainerMonitorForOverallocationTest)
+ containerManager.getContainersMonitor())
+ .setContainerResourceUsage(usage);
+ }
+
+ private void allowContainerToSucceed(int containerId) {
+ ((LongRunningContainerSimulatingContainerExecutor) this.exec)
+ .containerSucceeded(createContainerId(containerId));
+ }
+
+ /**
+ * Check if a given container is queued by the container scheduler.
+ * This is to prevent a race condition where
+ * attemptToStartOpportunisticContainers() is called before the container
+ * is queued.
+ *
+ * TODO: introduce a new container state, QUEUED to ContainerSubState
+ */
+ private void waitForOpportunisticContainerToBeQueued(ContainerId container)
+ throws Exception {
+ BaseContainerManagerTest.waitForContainerSubState(containerManager,
+ container, ContainerSubState.SCHEDULED);
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ OpportunisticContainerSchedulerForTest containerScheduler =
+ (OpportunisticContainerSchedulerForTest)
+ containerManager.getContainerScheduler();
+ return containerScheduler.isQueued(container);
+ }
+ }, 1000, 10000);
+ }
+
+
+ protected StartContainerRequest createStartContainerRequest(
+ int containerId, Resource resource, boolean isGuaranteed)
+ throws IOException {
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ ExecutionType executionType = isGuaranteed ?
+ ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC;
+ Token containerToken = createContainerToken(
+ createContainerId(containerId),
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource,
+ context.getContainerTokenSecretManager(), null, executionType);
+
+ return StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken);
+ }
+
+ protected void verifyContainerStatuses(
+ Map expected)
+ throws IOException, YarnException {
+ List statList = new ArrayList<>(expected.keySet());
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+
+ for (ContainerStatus status : containerStatuses) {
+ ContainerId containerId = status.getContainerId();
+ Assert.assertEquals(containerId + " is in unexpected state",
+ expected.get(containerId), status.getContainerSubState());
+ }
+ }
+
+ /**
+ * A container manager that sends a dummy container pid while it's cleaning
+ * up running containers. Used along with
+ * LongRunningContainerSimulatingContainerExecutor to simulate long running
+ * container processes for testing purposes.
+ */
+ private static class LongRunningContainerSimulatingContainersManager
+ extends ContainerManagerImpl {
+
+ private final String user;
+
+ LongRunningContainerSimulatingContainersManager(
+ Context context, ContainerExecutor exec,
+ DeletionService deletionContext,
+ NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics,
+ LocalDirsHandlerService dirsHandler, String user) {
+ super(context, exec, deletionContext,
+ nodeStatusUpdater, metrics, dirsHandler);
+ this.user = user;
+ }
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
+ /**
+ * Create a container launcher that signals container processes
+ * with a dummy pid. The container processes are simulated in
+ * LongRunningContainerSimulatingContainerExecutor which does
+ * not write a pid file on behalf of containers to launch, so
+ * the pid does not matter.
+ */
+ @Override
+ protected ContainersLauncher createContainersLauncher(
+ Context context, ContainerExecutor exec) {
+ ContainerManagerImpl containerManager = this;
+ return new ContainersLauncher(context, dispatcher, exec, dirsHandler,
+ this) {
+ @Override
+ protected ContainerLaunch createContainerLaunch(
+ Application app, Container container) {
+ return new ContainerLaunch(context, getConfig(), dispatcher,
+ exec, app, container, dirsHandler, containerManager) {
+ @Override
+ protected String getContainerPid(Path pidFilePath)
+ throws Exception {
+ return "123";
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ protected AsyncDispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+
+ @Override
+ protected ContainersMonitor createContainersMonitor(
+ ContainerExecutor exec) {
+ return new ContainerMonitorForOverallocationTest(exec,
+ dispatcher, context);
+ }
+
+ public void attemptToStartOpportunisticContainers() {
+ ((ContainerMonitorForOverallocationTest) getContainersMonitor())
+ .attemptToStartContainersUponLowUtilization();
+ }
+ }
+
+ /**
+ * A container executor that simulates long running container processes
+ * by having container launch threads sleep infinitely until it's given
+ * a signal to finish with either a success or failure exit code.
+ */
+ private static class LongRunningContainerSimulatingContainerExecutor
+ extends DefaultContainerExecutor {
+ private ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+
+ public void containerSucceeded(ContainerId containerId) {
+ ContainerFinishLatch containerFinishLatch = containers.get(containerId);
+ if (containerFinishLatch != null) {
+ containerFinishLatch.toSucceed();
+ }
+ }
+
+ public void containerFailed(ContainerId containerId) {
+ ContainerFinishLatch containerFinishLatch = containers.get(containerId);
+ if (containerFinishLatch != null) {
+ containerFinishLatch.toFail();
+ }
+ }
+
+ /**
+ * Simulate long running container processes by having container launcher
+ * threads wait infinitely for a signal to finish.
+ */
+ @Override
+ public int launchContainer(ContainerStartContext ctx)
+ throws IOException, ConfigurationException {
+ ContainerId container = ctx.getContainer().getContainerId();
+ containers.putIfAbsent(container, new ContainerFinishLatch(container));
+
+ // simulate a long running container process by having the
+ // container launch thread sleep forever until it's given a
+ // signal to finish with a exit code.
+ while (!containers.get(container).toProceed) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return -1;
+ }
+ }
+
+ return containers.get(container).getContainerExitCode();
+ }
+
+ /**
+ * Override signalContainer() so that simulated container processes are
+ * properly cleaned up.
+ */
+ @Override
+ public boolean signalContainer(ContainerSignalContext ctx)
+ throws IOException {
+ containerSucceeded(ctx.getContainer().getContainerId());
+ return true;
+ }
+
+ /**
+ * A signal that container launch threads wait for before exiting in order
+ * to simulate long running container processes.
+ */
+ private static final class ContainerFinishLatch {
+ private volatile boolean toProceed;
+ private int exitCode;
+ private final ContainerId container;
+
+ ContainerFinishLatch(ContainerId containerId) {
+ exitCode = 0;
+ toProceed = false;
+ container = containerId;
+ }
+
+ void toSucceed() {
+ exitCode = 0;
+ toProceed = true;
+ }
+
+ void toFail() {
+ exitCode = -101;
+ toProceed = true;
+ }
+
+ int getContainerExitCode() {
+ // read barrier of toProceed to make sure the exit code is not stale
+ if (toProceed) {
+ LOG.debug(container + " finished with exit code: " + exitCode);
+ }
+ return exitCode;
+ }
+ }
+ }
+
+ /**
+ * A test implementation of container monitor that allows control of current
+ * resource utilization.
+ */
+ private static class ContainerMonitorForOverallocationTest
+ extends ContainersMonitorImpl {
+
+ private volatile ResourceUtilization containerResourceUsage =
+ ResourceUtilization.newInstance(0, 0, 0.0f);
+
+ ContainerMonitorForOverallocationTest(ContainerExecutor exec,
+ AsyncDispatcher dispatcher, Context context) {
+ super(exec, dispatcher, context);
+ }
+
+ @Override
+ public long getPmemAllocatedForContainers() {
+ return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L;
+ }
+
+ @Override
+ public long getVmemAllocatedForContainers() {
+ float pmemRatio = getConfig().getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ return (long) (pmemRatio * getPmemAllocatedForContainers());
+ }
+
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return NM_CONTAINERS_VCORES;
+ }
+
+ @Override
+ public ResourceUtilization getContainersUtilization() {
+ return containerResourceUsage;
+ }
+
+ @Override
+ protected void checkOverAllocationPrerequisites() throws YarnException {
+ // do nothing
+ }
+
+ public void setContainerResourceUsage(
+ ResourceUtilization containerResourceUsage) {
+ this.containerResourceUsage = containerResourceUsage;
+ }
+ }
+
+ /**
+ * A test implementation of OpportunisticContainerScheduler that allows us to
+ * check if an OPPORTUNISTIC container is queued or not.
+ */
+ private static class OpportunisticContainerSchedulerForTest
+ extends OpportunisticContainerScheduler {
+
+ private final ConcurrentHashSet
+ queuedOpportunisticContainers = new ConcurrentHashSet<>();
+
+ OpportunisticContainerSchedulerForTest(
+ Context context, AsyncDispatcher dispatcher,
+ NodeManagerMetrics metrics) {
+ super(context, dispatcher, metrics);
+ }
+
+ @Override
+ protected boolean enqueueOpportunisticContainer(Container container) {
+ boolean queued = super.enqueueOpportunisticContainer(container);
+ if (queued) {
+ queuedOpportunisticContainers.add(container.getContainerId());
+ }
+ return queued;
+ }
+
+ public boolean isQueued(ContainerId containerId) {
+ return queuedOpportunisticContainers.contains(containerId);
+ }
+ }
+}