diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
new file mode 100644
index 0000000..866e63d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.LinkedHashMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.api.support.membermodification.MemberModifier;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Tests to verify that the {@link ContainerScheduler} is able to
+ * recover active containers based on RecoveredContainerStatus and
+ * ExecutionType.
+ */
+public class TestContainerSchedulerRecovery {
+
+ @Mock NMContext context;
+
+ @Mock NodeManagerMetrics metrics;
+
+ @Mock AsyncDispatcher dispatcher;
+
+ @Mock ContainerTokenIdentifier token;
+
+ @Mock ContainerImpl container;
+
+ @Mock ApplicationId appId;
+
+ @Mock ApplicationAttemptId appAttemptId;
+
+ @Mock ContainerId containerId;
+
+ @Mock AllocationBasedResourceUtilizationTracker
+ allocationBasedResourceUtilizationTracker;
+
+ @InjectMocks ContainerScheduler tempContainerScheduler =
+ new ContainerScheduler(context, dispatcher, metrics, 0);
+
+ ContainerScheduler spy;
+
+ @Before public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ spy = PowerMockito.spy(tempContainerScheduler);
+ PowerMockito.when(container.getContainerId()).thenReturn(containerId);
+ PowerMockito.when(containerId.getApplicationAttemptId())
+ .thenReturn(appAttemptId);
+ PowerMockito.when(containerId.getApplicationAttemptId().getApplicationId())
+ .thenReturn(appId);
+ PowerMockito.when(containerId.getContainerId()).thenReturn(123L);
+ PowerMockito.doNothing().when(allocationBasedResourceUtilizationTracker)
+ .addContainerResources(container);
+ MemberModifier.field(ContainerScheduler.class, "utilizationTracker")
+ .set(spy, allocationBasedResourceUtilizationTracker);
+ }
+
+ @After public void tearDown() {
+ }
+
+ /*Test if a container is recovered as QUEUED, GUARANTEED,
+ * it should be added to queuedGuaranteedContainers map.
+ * */
+ @Test public void testRecoverContainerQueuedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.GUARANTEED);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(1, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
+ * it should be added to queuedOpportunisticContainers map.
+ * */
+ @Test public void testRecoverContainerQueuedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.OPPORTUNISTIC);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(1, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as PAUSED, GUARANTEED,
+ * it should be added to queuedGuaranteedContainers map.
+ * */
+ @Test public void testRecoverContainerPausedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.GUARANTEED);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(1, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
+ * it should be added to queuedOpportunisticContainers map.
+ * */
+ @Test public void testRecoverContainerPausedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.OPPORTUNISTIC);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(1, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as LAUNCHED, GUARANTEED,
+ * it should be added to runningContainers map.
+ * */
+ @Test public void testRecoverContainerLaunchedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.GUARANTEED);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(1, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
+ * it should be added to runningContainers map.
+ * */
+ @Test public void testRecoverContainerLaunchedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.OPPORTUNISTIC);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(1, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as REQUESTED, GUARANTEED,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testRecoverContainerRequestedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.GUARANTEED);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testRecoverContainerRequestedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.OPPORTUNISTIC);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as COMPLETED, GUARANTEED,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testRecoverContainerCompletedGuaranteed()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.GUARANTEED);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testRecoverContainerCompletedOpportunistic()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+ PowerMockito.when(token.getExecutionType())
+ .thenReturn(ExecutionType.OPPORTUNISTIC);
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as GUARANTEED but no executionType set,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testContainerQueuedNoExecType()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+
+ /*Test if a container is recovered as PAUSED but no executionType set,
+ * it should not be added to any map mentioned below.
+ * */
+ @Test public void testContainerPausedNoExecType()
+ throws IllegalArgumentException, IllegalAccessException {
+ LinkedHashMap tempQueuedGuaranteedContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedGuaranteedContainers");
+ LinkedHashMap tempQueuedOpportunisticContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "queuedOpportunisticContainers");
+ LinkedHashMap tempRunningContainers =
+ (LinkedHashMap) Whitebox
+ .getInternalState(spy, "runningContainers");
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+ PowerMockito.when(container.getContainerTokenIdentifier())
+ .thenReturn(token);
+ spy.recoverActiveContainer(container, rcs);
+ assertEquals(0, tempQueuedGuaranteedContainers.size());
+ assertEquals(0, tempQueuedOpportunisticContainers.size());
+ assertEquals(0, tempRunningContainers.size());
+ Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0))
+ .addContainerResources(container);
+ }
+}
\ No newline at end of file