diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 562ba5d5062..1a71f6ac09b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -79,6 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair @@ -95,6 +97,9 @@ public class TestApplicationMasterService { private static final Log LOG = LogFactory .getLog(TestApplicationMasterService.class); + private static final String FS_DEFAULT_QUEUE = "root.default"; + private static final String CS_DEFAULT_QUEUE = "default"; + private static final String CUSTOM_RES = "res_1"; private final int GB = 1024; private static YarnConfiguration conf; @@ -193,6 +198,74 @@ public void finishApplicationMaster( } } + private Map initializeMandatoryResources() { + Map riMap = new HashMap<>(); + + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + return riMap; + } + + private YarnConfiguration getYarnConfigBySchedulerClass(Class + schedulerCls) { + YarnConfiguration yarnConf; + if (schedulerCls.getCanonicalName() + .equals(CapacityScheduler.class.getCanonicalName())) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + yarnConf = new YarnConfiguration(csConf); + } else if (schedulerCls.getCanonicalName() + .equals(FairScheduler.class.getCanonicalName())) { + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + yarnConf = new YarnConfiguration(fsConf); + } else { + throw new IllegalStateException( + "Scheduler class is of wrong type: " + schedulerCls); + } + return yarnConf; + } + + private void requestMemory(MockAM am1, long memory) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(memory, 1, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestVcores(MockAM am1, int vCores) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability( + TestUtils.createResource(8 * GB, vCores, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestCustomResource(MockAM am1, + ImmutableMap customResource) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(8 * GB, 1, customResource)) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + @Before public void setup() { conf = new YarnConfiguration(); @@ -695,41 +768,12 @@ public void testFSValidateRequestCapacityAgainstMinMaxAllocation() private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedulerCls) throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = ResourceInformation.newInstance( - ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = ResourceInformation.newInstance( - ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - + Map riMap = + initializeMandatoryResources(); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); - final YarnConfiguration yarnConf; - if (schedulerCls.getCanonicalName() - .equals(CapacityScheduler.class.getCanonicalName())) { - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(); - csConf.setResourceComparator(DominantResourceCalculator.class); - yarnConf = new YarnConfiguration(csConf); - } else if (schedulerCls.getCanonicalName() - .equals(FairScheduler.class.getCanonicalName())) { - FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); - yarnConf = new YarnConfiguration(fsConf); - } else { - throw new IllegalStateException( - "Scheduler class is of wrong type: " + schedulerCls); - } + final YarnConfiguration yarnConf = + getYarnConfigBySchedulerClass(schedulerCls); // Don't reset resource types since we have already configured resource // types @@ -746,7 +790,7 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request resource, memory > allowed @@ -781,26 +825,10 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul @Test public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits() throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = - ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = - ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + Map riMap = initializeMandatoryResources(); ResourceInformation res1 = - ResourceInformation.newInstance("res_1", "G", 0, 4); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - riMap.put("res_1", res1); + ResourceInformation.newInstance(CUSTOM_RES, "G", 0, 4); + riMap.put(CUSTOM_RES, res1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); @@ -824,9 +852,9 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ImmutableMap. builder() - .put("res_1", "5G").build())); + .put(CUSTOM_RES, "5G").build())); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request res_1, 500M < 5G so it should be allowed @@ -834,7 +862,7 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() .capability(ResourceTypesTestHelper.newResource(4 * GB, 1, ImmutableMap. builder() - .put("res_1", "500M") + .put(CUSTOM_RES, "500M") .build())) .numContainers(1).resourceName("*").build()), null); } catch (InvalidResourceRequestException e) { @@ -846,28 +874,12 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits } @Test(timeout = 300000) - public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes() + public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypesCS() throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = ResourceInformation.newInstance( - ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = ResourceInformation.newInstance( - ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - ResourceInformation res1 = ResourceInformation.newInstance("res_1", + Map riMap = initializeMandatoryResources(); + ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES, ResourceInformation.VCORES.getUnits(), 0, 4); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - riMap.put("res_1", res1); + riMap.put(CUSTOM_RES, res1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); @@ -888,60 +900,96 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType rm.start(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + LeafQueue leafQueue = (LeafQueue) cs.getQueue(CS_DEFAULT_QUEUE); MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - ImmutableMap.of("res_1", 4))); + ImmutableMap.of(CUSTOM_RES, 4))); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); Assert.assertEquals(Resource.newInstance(GB, 1), leafQueue.getUsedResources()); // Now request resource, memory > allowed - boolean exception = false; try { - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability(TestUtils.createResource(9 * GB, 1, - ImmutableMap.of("res_1", 1))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; - } - Assert.assertTrue(exception); + requestMemory(am1, 9 * GB); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} - exception = false; try { // Now request resource, vcores > allowed - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability( - TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; - } - Assert.assertTrue(exception); + requestVcores(am1, 18); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} - exception = false; try { // Now request resource, res_1 > allowed - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability(TestUtils.createResource(8 * GB, 1, - ImmutableMap.of("res_1", 100))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; + requestCustomResource(am1, ImmutableMap.of(CUSTOM_RES, 100)); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} + + rm.close(); + } + + @Test(timeout = 300000) + public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypesFS() + throws Exception { + Map riMap = initializeMandatoryResources(); + ResourceInformation res_1 = ResourceInformation.newInstance(CUSTOM_RES, + ResourceInformation.VCORES.getUnits(), 0, 4); + riMap.put(CUSTOM_RES, res_1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + + YarnConfiguration yarnConf = new YarnConfiguration(fsConf); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + MockRM rm = new MockRM(yarnConf); + rm.start(); + + FairScheduler fs = (FairScheduler) rm.getResourceScheduler(); + FSLeafQueue leafQueue = + fs.getQueueManager().getLeafQueue(FS_DEFAULT_QUEUE, false); + + MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils + .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap.of(CUSTOM_RES, 4))); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, FS_DEFAULT_QUEUE); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + Assert.assertEquals(Resource.newInstance(1 * GB, 1), + leafQueue.getResourceUsage()); + + // Now request resource, memory > allowed + try { + requestMemory(am1, 9 * GB); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) { } - Assert.assertTrue(exception); + + try { + // Now request resource, vcores > allowed + requestVcores(am1, 18); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) { } + + try { + // Now request resource, res_1 > allowed + requestCustomResource(am1, ImmutableMap.of(CUSTOM_RES, 100)); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} rm.close(); }