diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index fe3a889af9d..3543bc4707e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -233,6 +234,13 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { ArrayList updatedStats = new ArrayList(); @@ -243,15 +251,62 @@ public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), isHealthy, responseId); } + /** + * Sends the heartbeat of the node. + * @param oppContainersStatus opportunistic containers status. + * @param isHealthy whether node is healthy. + * @return response of the heartbeat. + * @throws Exception + */ + public NodeHeartbeatResponse nodeHeartbeat( + OpportunisticContainersStatus oppContainersStatus, boolean isHealthy) + throws Exception { + return nodeHeartbeat(Collections.emptyList(), + Collections.emptyList(), isHealthy, responseId, oppContainersStatus); + } + + /** + * Sends the heartbeat of the node. + * @param updatedStats containers with updated status. + * @param increasedConts containers whose resource has been increased. + * @param isHealthy whether node is healthy. + * @param resId response id. + * @return response of the heartbeat. + * @throws Exception + */ + public NodeHeartbeatResponse nodeHeartbeat( + List updatedStats, List increasedConts, + boolean isHealthy, int resId) throws Exception { + return nodeHeartbeat(updatedStats, increasedConts, + isHealthy, resId, null); + } + + /** + * Sends the heartbeat of the node. + * @param updatedStats containers with updated status. + * @param increasedConts containers whose resource has been increased. + * @param isHealthy whether node is healthy. + * @param resId response id. + * @param oppContainersStatus opportunistic containers status. + * @return response of the heartbeat. + * @throws Exception + */ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, - List increasedConts, boolean isHealthy, int resId) - throws Exception { + List increasedConts, boolean isHealthy, int resId, + OpportunisticContainersStatus oppContainersStatus) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); @@ -269,6 +324,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, containerStats.remove(cid); } status.setIncreasedContainers(increasedConts); + status.setOpportunisticContainersStatus(oppContainersStatus); NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); healthStatus.setHealthReport(""); healthStatus.setIsNodeHealthy(isHealthy); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index bdd4f7155c1..70782440423 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -90,9 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -103,7 +100,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; @@ -122,6 +118,9 @@ private MockRM rm; private DrainDispatcher dispatcher; + private OpportunisticContainersStatus oppContainersStatus = + getOpportunisticStatus(); + @Before public void createAndStartRM() { CapacitySchedulerConfiguration csConf = @@ -184,38 +183,24 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception nm3.registerNode(); nm4.registerNode(); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + nm3.nodeHeartbeat(oppContainersStatus, true); + nm4.nodeHeartbeat(oppContainersStatus, true); + OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); - ApplicationAttemptId attemptId = - app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); ResourceScheduler scheduler = rm.getResourceScheduler(); - RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); - RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); - - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); - - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode3)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode4)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode3)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode4)); + // All nodes 1 - 4 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + nm3.nodeHeartbeat(oppContainersStatus, true); + nm4.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100); @@ -253,7 +238,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, ExecutionType.GUARANTEED))); // Node on same host should not result in allocation - sameHostDiffNode.nodeHeartbeat(true); + sameHostDiffNode.nodeHeartbeat(oppContainersStatus, true); rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); @@ -296,7 +281,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception .getUpdateContainerRequest().getContainerId()); // Ensure after correct node heartbeats, we should get the allocation - allocNode.nodeHeartbeat(true); + allocNode.nodeHeartbeat(oppContainersStatus, true); rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); @@ -310,10 +295,10 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception // Allocated cores+mem should have increased, available should decrease verifyMetrics(metrics, 14336, 14, 2048, 2, 2); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + nm3.nodeHeartbeat(oppContainersStatus, true); + nm4.nodeHeartbeat(oppContainersStatus, true); rm.drainEvents(); // Verify that the container is still in ACQUIRED state wrt the RM. @@ -352,36 +337,20 @@ public void testContainerPromoteAfterContainerStart() throws Exception { nm1.registerNode(); nm2.registerNode(); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); - ApplicationAttemptId attemptId = - app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); ResourceScheduler scheduler = rm.getResourceScheduler(); - RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - - OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) - .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -478,36 +447,21 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { nm1.registerNode(); nm2.registerNode(); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); - ApplicationAttemptId attemptId = - app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); ResourceScheduler scheduler = rm.getResourceScheduler(); - RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - - OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) - .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -591,30 +545,17 @@ public void testContainerAutoUpdateContainer() throws Exception { createAndStartRMWithAutoUpdateContainer(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); + nm1.nodeHeartbeat(oppContainersStatus, true); OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); - ApplicationAttemptId attemptId = - app1.getCurrentAppAttempt().getAppAttemptId(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); ResourceScheduler scheduler = rm.getResourceScheduler(); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - nm1.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - - OpportunisticContainerContext ctxt = - ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) - .getOpportunisticContainerContext(); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - - nm1.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100); @@ -713,7 +654,7 @@ public void testContainerAutoUpdateContainer() throws Exception { Assert.assertEquals(Resource.newInstance(1 * GB, 1), response.getContainersToUpdate().get(0).getResource()); - nm1.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); // DEMOTE the container allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( UpdateContainerRequest.newInstance(3, container.getId(), @@ -735,7 +676,7 @@ public void testContainerAutoUpdateContainer() throws Exception { uc.getContainer().getExecutionType()); // Check that the container is updated in NM through NM heartbeat response if (response.getContainersToUpdate().size() == 0) { - response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(oppContainersStatus, true); } Assert.assertEquals(1, response.getContainersToUpdate().size()); Assert.assertEquals(ExecutionType.OPPORTUNISTIC, @@ -761,6 +702,10 @@ public void testOpportunisticSchedulerMetrics() throws Exception { nodes.put(nm2.getNodeId(), nm2); nm1.registerNode(); nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + OpportunisticSchedulerMetrics metrics = OpportunisticSchedulerMetrics.getMetrics(); @@ -777,28 +722,10 @@ public void testOpportunisticSchedulerMetrics() throws Exception { app1.getCurrentAppAttempt().getAppAttemptId(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); ResourceScheduler scheduler = rm.getResourceScheduler(); - RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - - OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) - .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -890,6 +817,10 @@ public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); @@ -900,20 +831,14 @@ public void testNodeRemovalDuringAllocate() throws Exception { ResourceScheduler scheduler = rm.getResourceScheduler(); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + // Both node 1 and node 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + for (int i = 0; i < 10; i++) { am1.allocate( Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), @@ -948,10 +873,10 @@ public void testNodeRemovalDuringAllocate() throws Exception { @Test(timeout = 60000) public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); + nm.registerNode(); - OpportunisticContainerAllocatorAMService amservice = - (OpportunisticContainerAllocatorAMService) rm - .getApplicationMasterService(); + nm.nodeHeartbeat(oppContainersStatus, true); + RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default"); ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId(); @@ -960,12 +885,8 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { SchedulerApplicationAttempt schedulerAttempt = ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId()); - nm.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - // Send add and update node events to AM Service. - amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); - amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + nm.nodeHeartbeat(oppContainersStatus, true); GenericTestUtils.waitFor(() -> scheduler.getNumClusterNodes() == 1, 10, 200 * 100); @@ -1000,13 +921,18 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { RMAppAttemptState.FAILED, false)); } + private OpportunisticContainersStatus getOpportunisticStatus() { + return getOppurtunisticStatus(-1, 100, 1000); + } + private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, - int queueLength) { - OpportunisticContainersStatus status1 = - Mockito.mock(OpportunisticContainersStatus.class); - Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime); - Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength); - return status1; + int queueLength, int queueCapacity) { + OpportunisticContainersStatus status = + OpportunisticContainersStatus.newInstance(); + status.setEstimatedQueueWaitTime(waitTime); + status.setOpportQueueCapacity(queueCapacity); + status.setWaitQueueLength(queueLength); + return status; } // Test if the OpportunisticContainerAllocatorAMService can handle both