diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index bdd4f7155c1..82627f0f904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -855,6 +855,82 @@ public void testOpportunisticSchedulerMetrics() throws Exception { metrics.getAggregatedReleasedContainers()); } + /** + * This test show cases the problem in current Opportunistic Scheduler + * that in case multiple application asks for Opportunistic container at + * same point of time, all the container gets allocated on the same node + * instead of spreading them out across the nodes. + * @throws Exception + */ + @Test(timeout = 600000) + public void testOpportunisticSchedulerAllocationSkew() throws Exception { + List nodeList = new ArrayList<>(); + + // Adding 20 nodes to the cluster. + for (int i = 0; i < 20; ++i) { + String hostname = "h" + i + ":1234"; + MockNM nm = new MockNM(hostname, 16 * GB, 8, + rm.getResourceTrackerService()); + nodeList.add(nm); + nm.registerNode(); + } + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + + List apps = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + RMApp app = rm.submitApp(GB, "app", "user", null, "default"); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nodeList.get(i)); + apps.add(am); + } + + for (MockNM nm : nodeList) { + nm.nodeHeartbeat(true); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + ((RMNodeImpl) rmNode) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 0)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode)); + nm.nodeHeartbeat(true); + } + + int numLeastLoadedNodes = rm.getRMContext().getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, + YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); + + GenericTestUtils.waitFor(() -> + amservice.getLeastLoadedNodes().size() == numLeastLoadedNodes, + 10, 10 * 100); + + // adding sleep to ensure that OpportunisticContainerAllocator gets + // all the nodes. + Thread.sleep(1000); + + List allocatedNodesHost = new ArrayList<>(); + + for (MockAM app : apps) { + AllocateResponse allocateResponse = app.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + + Assert.assertEquals(1, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + allocatedNodesHost.add(container.getNodeId().getHost()); + } + + Assert.assertEquals(allocatedNodesHost.size(), 10); + // check if all containers are allocated on same node + for(String host: allocatedNodesHost) { + Assert.assertEquals(host, "h19"); + } + } + @Test(timeout = 60000) public void testAMCrashDuringAllocate() throws Exception { MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());