diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java new file mode 100644 index 00000000000..9adab2b552a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.Thread.sleep; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; +import static org.junit.Assert.fail; + +public abstract class ApplicationMasterServiceTestBase { + private static final Log LOG = LogFactory + .getLog(ApplicationMasterServiceTestBase.class); + + protected final int GB = 1024; + + static final String FS_DEFAULT_QUEUE = "root.default"; + static final String CS_DEFAULT_QUEUE = "default"; + static final String CUSTOM_RES = "res_1"; + static final String DEFAULT_HOST = "127.0.0.1"; + static final String DEFAULT_PORT = "1234"; + + protected static YarnConfiguration conf; + + static AtomicInteger beforeRegCount = new AtomicInteger(0); + static AtomicInteger afterRegCount = new AtomicInteger(0); + static AtomicInteger beforeAllocCount = new AtomicInteger(0); + static AtomicInteger afterAllocCount = new AtomicInteger(0); + static AtomicInteger beforeFinishCount = new AtomicInteger(0); + static AtomicInteger afterFinishCount = new AtomicInteger(0); + static AtomicInteger initCount = new AtomicInteger(0); + + protected abstract YarnConfiguration createYarnConfig(); + + protected abstract Resource getResourceUsageForQueue(ResourceManager rm, + String queue); + + //TODO search for this as this is duplicated + void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + fail("Cannot find RMContainer"); + } + } + + Map initializeMandatoryResources() { + Map riMap = new HashMap<>(); + + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + return riMap; + } + + private void requestMemory(MockAM am1, long memory) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(memory, 1, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestVcores(MockAM am1, int vCores) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability( + TestUtils.createResource(8 * GB, vCores, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestCustomResource(MockAM am1, + ImmutableMap customResource) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(8 * GB, 1, customResource)) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + @Before + public void setup() { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + } + + @Test(timeout = 3000000) + public void testRMIdentifierOnContainerAllocation() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[] {DEFAULT_HOST}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + alloc1Response = am1.schedule(); + } + + // assert RMIdentifier is set properly in allocated containers + Container allocatedContainer = + alloc1Response.getAllocatedContainers().get(0); + ContainerTokenIdentifier tokenId = + BuilderUtils.newContainerTokenIdentifier(allocatedContainer + .getContainerToken()); + Assert.assertEquals(MockRM.getClusterTimeStamp(), + tokenId.getRMIdentifier()); + rm.stop(); + } + + @Test(timeout = 3000000) + public void testAllocateResponseIdOverflow() throws Exception { + MockRM rm = new MockRM(conf); + + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick off the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // Set the last responseId to be Integer.MAX_VALUE + Assert.assertTrue(am1.setApplicationLastResponseId(Integer.MAX_VALUE)); + + // Both allocate should succeed + am1.schedule(); // send allocate with responseId = Integer.MAX_VALUE + Assert.assertEquals(0, am1.getResponseId()); + + am1.schedule(); // send allocate with responseId = 0 + Assert.assertEquals(1, am1.getResponseId()); + + } finally { + rm.stop(); + } + } + + @Test(timeout=600000) + public void testInvalidContainerReleaseRequest() throws Exception { + MockRM rm = new MockRM(conf); + + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(1024); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[] {DEFAULT_HOST}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + alloc1Response = am1.schedule(); + } + + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() > 0); + + RMApp app2 = rm.submitApp(1024); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt2 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + + // Now trying to release container allocated for app1 -> appAttempt1. + ContainerId cId = alloc1Response.getAllocatedContainers().get(0).getId(); + am2.addContainerToBeReleased(cId); + try { + am2.schedule(); + fail("Exception was expected!!"); + } catch (InvalidContainerReleaseException e) { + StringBuilder sb = new StringBuilder("Cannot release container : "); + sb.append(cId.toString()); + sb.append(" not belonging to this application attempt : "); + sb.append(attempt2.getAppAttemptId().toString()); + Assert.assertTrue(e.getMessage().contains(sb.toString())); + } + } finally { + rm.stop(); + } + } + + @Test(timeout=1200000) + public void testProgressFilter() throws Exception{ + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList<>(); + List ask = new ArrayList<>(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + allocateRequest.setProgress(Float.POSITIVE_INFINITY); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=1){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress(Float.NaN); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)9); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=1){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress(Float.NEGATIVE_INFINITY); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)0.5); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0.5){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)-1); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + } + + @Test(timeout=1200000) + public void testFinishApplicationMasterBeforeRegistering() throws Exception { + MockRM rm = new MockRM(conf); + + try { + rm.start(); + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + // Submit an application + RMApp app1 = rm.submitApp(2048); + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.FAILED, "", ""); + try { + am1.unregisterAppAttempt(req, false); + fail("ApplicationMasterNotRegisteredException should be thrown"); + } catch (ApplicationMasterNotRegisteredException e) { + Assert.assertNotNull(e); + Assert.assertNotNull(e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "Application Master is trying to unregister before registering for:" + )); + } catch (Exception e) { + fail("ApplicationMasterNotRegisteredException should be thrown"); + } + + am1.registerAppAttempt(); + + am1.unregisterAppAttempt(req, false); + rm.waitForState(am1.getApplicationAttemptId(), + RMAppAttemptState.FINISHING); + } finally { + rm.stop(); + } + } + + @Test(timeout = 3000000) + public void testResourceTypes() throws Exception { + HashMap> driver = + new HashMap<>(); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf); + testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + + YarnConfiguration testCapacityDefConf = new YarnConfiguration(); + testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + + YarnConfiguration testFairDefConf = new YarnConfiguration(); + testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER, + FairScheduler.class, ResourceScheduler.class); + + driver.put(conf, + EnumSet.of(YarnServiceProtos.SchedulerResourceTypes.MEMORY)); + driver.put(testCapacityDRConf, + EnumSet.of(YarnServiceProtos.SchedulerResourceTypes.CPU, + YarnServiceProtos.SchedulerResourceTypes.MEMORY)); + driver.put(testCapacityDefConf, + EnumSet.of(YarnServiceProtos.SchedulerResourceTypes.MEMORY)); + driver.put(testFairDefConf, + EnumSet.of(YarnServiceProtos.SchedulerResourceTypes.MEMORY, + YarnServiceProtos.SchedulerResourceTypes.CPU)); + + for (Map.Entry> entry : + driver.entrySet()) { + EnumSet expectedValue = + entry.getValue(); + MockRM rm = new MockRM(entry.getKey()); + rm.start(); + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + RMApp app1 = rm.submitApp(2048); + //Wait to make sure the attempt has the right state + //TODO explore a better way than sleeping for a while (YARN-4929) + Thread.sleep(1000); + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + RegisterApplicationMasterResponse resp = am1.registerAppAttempt(); + EnumSet types = + resp.getSchedulerResourceTypes(); + LOG.info("types = " + types.toString()); + Assert.assertEquals(expectedValue, types); + rm.stop(); + } + } + + @Test(timeout=1200000) + public void testAllocateAfterUnregister() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + // unregister app attempt + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.KILLED, "", ""); + am1.unregisterAppAttempt(req, false); + // request container after unregister + am1.addRequests(new String[] {DEFAULT_HOST}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); + + nm1.nodeHeartbeat(true); + rm.drainEvents(); + alloc1Response = am1.schedule(); + Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size()); + } + + @Test(timeout = 300000) + public void testUpdateTrackingUrl() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + Assert.assertEquals("N/A", rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + String newTrackingUrl = "hadoop.apache.org"; + allocateRequest.setTrackingUrl(newTrackingUrl); + + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + + // Send it again + am1.allocate(allocateRequest); + Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getOriginalTrackingUrl()); + rm.stop(); + } + + protected void testValidateRequestCapacityAgainstMinMaxAllocation() + throws Exception { + Map riMap = + initializeMandatoryResources(); + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + final YarnConfiguration yarnConf = createYarnConfig(); + + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + + MockRM rm = new MockRM(yarnConf); + rm.start(); + + MockNM nm1 = rm.registerNode("199.99.99.1:" + DEFAULT_PORT, TestUtils + .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); + + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Now request resource, memory > allowed + boolean exception = false; + try { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(9 * GB, 1)) + .numContainers(1) + .resourceName("*") + .build()), null); + } catch (InvalidResourceRequestException e) { + exception = true; + } + Assert.assertTrue(exception); + + exception = false; + try { + // Now request resource, vcores > allowed + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(8 * GB, 18)) + .numContainers(1) + .resourceName("*") + .build()), null); + } catch (InvalidResourceRequestException e) { + exception = true; + } + Assert.assertTrue(exception); + + rm.close(); + } + + void testRequestCapacityMinMaxAllocationForResourceTypes( + String queueName) throws Exception { + Map riMap = initializeMandatoryResources(); + ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES, + ResourceInformation.VCORES.getUnits(), 0, 4); + riMap.put(CUSTOM_RES, res1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + final YarnConfiguration yarnConf = createYarnConfig(); + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + + MockRM rm = new MockRM(yarnConf); + rm.start(); + + MockNM nm1 = rm.registerNode("199.99.99.1:" + DEFAULT_PORT, TestUtils + .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap.of(CUSTOM_RES, 4))); + + RMApp app1 = rm.submitApp(GB, "app", "user", null, queueName); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + Assert.assertEquals(Resource.newInstance(GB, 1), + getResourceUsageForQueue(rm, queueName)); + + // Now request resource, memory > allowed + try { + requestMemory(am1, 9 * GB); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} + + try { + // Now request resource, vcores > allowed + requestVcores(am1, 18); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} + + try { + // Now request resource, res_1 > allowed + requestCustomResource(am1, ImmutableMap.of(CUSTOM_RES, 100)); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} + + rm.close(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterInterceptor.java new file mode 100644 index 00000000000..2bc33eed9c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterInterceptor.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static java.lang.Thread.sleep; + +public class TestApplicationMasterInterceptor extends + ApplicationMasterServiceTestBase { + private static final Log LOG = LogFactory + .getLog(TestApplicationMasterInterceptor.class); + + @Override + protected YarnConfiguration createYarnConfig() { + throw new IllegalStateException( + "This class should not use YarnConfig in its tests!"); + } + + @Override + protected Resource getResourceUsageForQueue(ResourceManager rm, String + queue) { + throw new IllegalStateException( + "This class should not use getResourceUsageForQueue in its tests!"); + } + + static class TestInterceptor1 implements + ApplicationMasterServiceProcessor { + + private ApplicationMasterServiceProcessor nextProcessor; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor next) { + initCount.incrementAndGet(); + this.nextProcessor = next; + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + nextProcessor.registerApplicationMaster( + applicationAttemptId, request, response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, + AllocateResponse response) throws YarnException { + beforeAllocCount.incrementAndGet(); + nextProcessor.allocate(appAttemptId, request, response); + afterAllocCount.incrementAndGet(); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + beforeFinishCount.incrementAndGet(); + afterFinishCount.incrementAndGet(); + } + } + + static class TestInterceptor2 implements + ApplicationMasterServiceProcessor { + + private ApplicationMasterServiceProcessor nextProcessor; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor next) { + initCount.incrementAndGet(); + this.nextProcessor = next; + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + beforeRegCount.incrementAndGet(); + nextProcessor.registerApplicationMaster(applicationAttemptId, + request, response); + afterRegCount.incrementAndGet(); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) + throws YarnException { + beforeAllocCount.incrementAndGet(); + nextProcessor.allocate(appAttemptId, request, response); + afterAllocCount.incrementAndGet(); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + beforeFinishCount.incrementAndGet(); + nextProcessor.finishApplicationMaster( + applicationAttemptId, request, response); + afterFinishCount.incrementAndGet(); + } + } + + @Test(timeout = 300000) + public void testApplicationMasterInterceptor() throws Exception { + conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + TestInterceptor1.class.getName() + "," + + TestInterceptor2.class.getName()); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + int allocCount = 0; + + am1.addRequests(new String[] {"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + allocCount++; + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + alloc1Response = am1.schedule(); + allocCount++; + } + + // assert RMIdentifier is set properly in allocated containers + Container allocatedContainer = + alloc1Response.getAllocatedContainers().get(0); + ContainerTokenIdentifier tokenId = + BuilderUtils.newContainerTokenIdentifier(allocatedContainer + .getContainerToken()); + am1.unregisterAppAttempt(); + + Assert.assertEquals(1, beforeRegCount.get()); + Assert.assertEquals(1, afterRegCount.get()); + + // The allocate calls should be incremented twice + Assert.assertEquals(allocCount * 2, beforeAllocCount.get()); + Assert.assertEquals(allocCount * 2, afterAllocCount.get()); + + // Finish should only be called once, since the FirstInterceptor + // does not forward the call. + Assert.assertEquals(1, beforeFinishCount.get()); + Assert.assertEquals(1, afterFinishCount.get()); + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 562ba5d5062..1a71f6ac09b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -79,6 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair @@ -95,6 +97,9 @@ public class TestApplicationMasterService { private static final Log LOG = LogFactory .getLog(TestApplicationMasterService.class); + private static final String FS_DEFAULT_QUEUE = "root.default"; + private static final String CS_DEFAULT_QUEUE = "default"; + private static final String CUSTOM_RES = "res_1"; private final int GB = 1024; private static YarnConfiguration conf; @@ -193,6 +198,74 @@ public void finishApplicationMaster( } } + private Map initializeMandatoryResources() { + Map riMap = new HashMap<>(); + + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + return riMap; + } + + private YarnConfiguration getYarnConfigBySchedulerClass(Class + schedulerCls) { + YarnConfiguration yarnConf; + if (schedulerCls.getCanonicalName() + .equals(CapacityScheduler.class.getCanonicalName())) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + yarnConf = new YarnConfiguration(csConf); + } else if (schedulerCls.getCanonicalName() + .equals(FairScheduler.class.getCanonicalName())) { + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + yarnConf = new YarnConfiguration(fsConf); + } else { + throw new IllegalStateException( + "Scheduler class is of wrong type: " + schedulerCls); + } + return yarnConf; + } + + private void requestMemory(MockAM am1, long memory) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(memory, 1, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestVcores(MockAM am1, int vCores) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability( + TestUtils.createResource(8 * GB, vCores, + ImmutableMap.of(CUSTOM_RES, 1))) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + private void requestCustomResource(MockAM am1, + ImmutableMap customResource) throws Exception { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(8 * GB, 1, customResource)) + .numContainers(1) + .resourceName("*") + .build()), null); + } + + @Before public void setup() { conf = new YarnConfiguration(); @@ -695,41 +768,12 @@ public void testFSValidateRequestCapacityAgainstMinMaxAllocation() private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedulerCls) throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = ResourceInformation.newInstance( - ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = ResourceInformation.newInstance( - ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - + Map riMap = + initializeMandatoryResources(); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); - final YarnConfiguration yarnConf; - if (schedulerCls.getCanonicalName() - .equals(CapacityScheduler.class.getCanonicalName())) { - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(); - csConf.setResourceComparator(DominantResourceCalculator.class); - yarnConf = new YarnConfiguration(csConf); - } else if (schedulerCls.getCanonicalName() - .equals(FairScheduler.class.getCanonicalName())) { - FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); - yarnConf = new YarnConfiguration(fsConf); - } else { - throw new IllegalStateException( - "Scheduler class is of wrong type: " + schedulerCls); - } + final YarnConfiguration yarnConf = + getYarnConfigBySchedulerClass(schedulerCls); // Don't reset resource types since we have already configured resource // types @@ -746,7 +790,7 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request resource, memory > allowed @@ -781,26 +825,10 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul @Test public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits() throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = - ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = - ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + Map riMap = initializeMandatoryResources(); ResourceInformation res1 = - ResourceInformation.newInstance("res_1", "G", 0, 4); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - riMap.put("res_1", res1); + ResourceInformation.newInstance(CUSTOM_RES, "G", 0, 4); + riMap.put(CUSTOM_RES, res1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); @@ -824,9 +852,9 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ImmutableMap. builder() - .put("res_1", "5G").build())); + .put(CUSTOM_RES, "5G").build())); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request res_1, 500M < 5G so it should be allowed @@ -834,7 +862,7 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() .capability(ResourceTypesTestHelper.newResource(4 * GB, 1, ImmutableMap. builder() - .put("res_1", "500M") + .put(CUSTOM_RES, "500M") .build())) .numContainers(1).resourceName("*").build()), null); } catch (InvalidResourceRequestException e) { @@ -846,28 +874,12 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits } @Test(timeout = 300000) - public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes() + public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypesCS() throws Exception { - - // Initialize resource map for 2 types. - Map riMap = new HashMap<>(); - - // Initialize mandatory resources - ResourceInformation memory = ResourceInformation.newInstance( - ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = ResourceInformation.newInstance( - ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - ResourceInformation res1 = ResourceInformation.newInstance("res_1", + Map riMap = initializeMandatoryResources(); + ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES, ResourceInformation.VCORES.getUnits(), 0, 4); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - riMap.put("res_1", res1); + riMap.put(CUSTOM_RES, res1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); @@ -888,60 +900,96 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType rm.start(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + LeafQueue leafQueue = (LeafQueue) cs.getQueue(CS_DEFAULT_QUEUE); MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - ImmutableMap.of("res_1", 4))); + ImmutableMap.of(CUSTOM_RES, 4))); - RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, CS_DEFAULT_QUEUE); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); Assert.assertEquals(Resource.newInstance(GB, 1), leafQueue.getUsedResources()); // Now request resource, memory > allowed - boolean exception = false; try { - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability(TestUtils.createResource(9 * GB, 1, - ImmutableMap.of("res_1", 1))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; - } - Assert.assertTrue(exception); + requestMemory(am1, 9 * GB); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} - exception = false; try { // Now request resource, vcores > allowed - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability( - TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; - } - Assert.assertTrue(exception); + requestVcores(am1, 18); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} - exception = false; try { // Now request resource, res_1 > allowed - am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() - .capability(TestUtils.createResource(8 * GB, 1, - ImmutableMap.of("res_1", 100))) - .numContainers(1) - .resourceName("*") - .build()), null); - } catch (InvalidResourceRequestException e) { - exception = true; + requestCustomResource(am1, ImmutableMap.of(CUSTOM_RES, 100)); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} + + rm.close(); + } + + @Test(timeout = 300000) + public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypesFS() + throws Exception { + Map riMap = initializeMandatoryResources(); + ResourceInformation res_1 = ResourceInformation.newInstance(CUSTOM_RES, + ResourceInformation.VCORES.getUnits(), 0, 4); + riMap.put(CUSTOM_RES, res_1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + + YarnConfiguration yarnConf = new YarnConfiguration(fsConf); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + MockRM rm = new MockRM(yarnConf); + rm.start(); + + FairScheduler fs = (FairScheduler) rm.getResourceScheduler(); + FSLeafQueue leafQueue = + fs.getQueueManager().getLeafQueue(FS_DEFAULT_QUEUE, false); + + MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils + .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap.of(CUSTOM_RES, 4))); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, FS_DEFAULT_QUEUE); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + Assert.assertEquals(Resource.newInstance(1 * GB, 1), + leafQueue.getResourceUsage()); + + // Now request resource, memory > allowed + try { + requestMemory(am1, 9 * GB); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) { } - Assert.assertTrue(exception); + + try { + // Now request resource, vcores > allowed + requestVcores(am1, 18); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) { } + + try { + // Now request resource, res_1 > allowed + requestCustomResource(am1, ImmutableMap.of(CUSTOM_RES, 100)); + Assert.fail("Should throw InvalidResourceRequestException"); + } catch (InvalidResourceRequestException ignored) {} rm.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java new file mode 100644 index 00000000000..6588f028c4c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestApplicationMasterServiceCapacity extends + ApplicationMasterServiceTestBase { + + @Override + protected YarnConfiguration createYarnConfig() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + YarnConfiguration yarnConf = new YarnConfiguration(csConf); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + return yarnConf; + } + + @Override + protected Resource getResourceUsageForQueue(ResourceManager rm, String queue) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue(CS_DEFAULT_QUEUE); + return leafQueue.getUsedResources(); + } + + @Test(timeout = 300000) + public void testValidateRequestCapacityAgainstMinMaxAllocation() throws + Exception { + super.testValidateRequestCapacityAgainstMinMaxAllocation(); + } + + @Test(timeout = 300000) + public void testRequestCapacityMinMaxAllocationForResourceTypes() + throws Exception { + testRequestCapacityMinMaxAllocationForResourceTypes(CS_DEFAULT_QUEUE); + } + + @Test(timeout=60000) + public void testInvalidIncreaseDecreaseRequest() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + try (MockRM rm = new MockRM(conf)) { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(1024); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + RegisterApplicationMasterResponse registerResponse = + am1.registerAppAttempt(); + + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1)); + + // Ask for a normal increase should be successful + am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048), null))); + + // Target resource is negative, should fail + AllocateResponse response = + am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(-1), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); + + // Target resource is more than maxAllocation, should fail + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.add( + registerResponse.getMaximumResourceCapability(), + Resources.createResource(1)), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); + + // Contains multiple increase/decrease requests for same containerId + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048, 4), null), + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(1024, 1), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + response.getUpdateErrors().get(0).getReason()); + } + } + + @Test(timeout = 300000) + public void testPriorityInAllocatedResponse() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB); + + // Submit an application + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm.submitApp(2048, appPriority1); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList<>(); + List ask = new ArrayList<>(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + AllocateResponse response1 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority1, response1.getApplicationPriority()); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(), + appPriority2); + + AllocateResponse response2 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority2, response2.getApplicationPriority()); + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java new file mode 100644 index 00000000000..c7926ca6b63 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; +import static org.junit.Assert.fail; + +public class TestApplicationMasterServiceFair extends + ApplicationMasterServiceTestBase { + + @Override + protected YarnConfiguration createYarnConfig() { + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + YarnConfiguration yarnConf = new YarnConfiguration(fsConf); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + return yarnConf; + } + + @Override + protected Resource getResourceUsageForQueue(ResourceManager rm, String + queue) { + FairScheduler fs = (FairScheduler) rm.getResourceScheduler(); + FSLeafQueue leafQueue = + fs.getQueueManager().getLeafQueue(FS_DEFAULT_QUEUE, false); + return leafQueue.getResourceUsage(); + } + + @Test(timeout = 300000) + public void testValidateRequestCapacityAgainstMinMaxAllocation() throws + Exception { + super.testValidateRequestCapacityAgainstMinMaxAllocation(); + } + + @Test(timeout = 300000) + public void testRequestCapacityMinMaxAllocationForResourceTypes() + throws Exception { + testRequestCapacityMinMaxAllocationForResourceTypes(FS_DEFAULT_QUEUE); + } + + @Test + public void testRequestCapacityMinMaxAllocationWithDifferentUnits() + throws Exception { + Map riMap = initializeMandatoryResources(); + ResourceInformation res1 = + ResourceInformation.newInstance(CUSTOM_RES, "G", 0, 4); + riMap.put(CUSTOM_RES, res1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + final YarnConfiguration yarnConf = createYarnConfig(); + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + + MockRM rm = new MockRM(yarnConf); + rm.start(); + + MockNM nm1 = rm.registerNode("199.99.99.1:" + DEFAULT_PORT, + ResourceTypesTestHelper.newResource( + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap. builder() + .put(CUSTOM_RES, "5G").build())); + + RMApp app1 = rm.submitApp(GB, "app", "user", null, FS_DEFAULT_QUEUE); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Now request res_1, 500M < 5G so it should be allowed + try { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(ResourceTypesTestHelper.newResource(4 * GB, 1, + ImmutableMap. builder() + .put(CUSTOM_RES, "500M") + .build())) + .numContainers(1).resourceName("*").build()), null); + } catch (InvalidResourceRequestException e) { + fail( + "Allocate request should be accepted but exception was thrown: " + e); + } + + rm.close(); + } +}