diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java new file mode 100644 index 00000000000..288a1697b77 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java @@ -0,0 +1,86 @@ +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; + +import java.util.Collections; +import java.util.Map; + +class ContainerRequestCreator { + + public static class TestResourceRequest { + private final int memory; + private final int vCores; + private final Map customResources; + + private TestResourceRequest(int memory, int vCores, + Map customResources) { + this.memory = memory; + this.vCores = vCores; + this.customResources = customResources; + } + + public static TestResourceRequest createWithMemory(int memory) { + return new TestResourceRequest(memory, 1, Collections.emptyMap()); + } + + public static TestResourceRequest create(int memory, int vCores) { + return new TestResourceRequest(memory, vCores, Collections.emptyMap()); + } + + public static TestResourceRequest create(int memory, + Map customResources) { + return new TestResourceRequest(memory, 1, customResources); + } + + public static TestResourceRequest create(int memory, int vCores, + Map customResources) { + return new TestResourceRequest(memory, vCores, customResources); + } + } + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + TestResourceRequest resourceRequest, String[] hosts) { + return createRequest(jobId, taskAttemptId, resourceRequest, hosts, + false, false); + } + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + TestResourceRequest resourceRequest, + String[] hosts, boolean earlierFailedAttempt, boolean reduce) { + final TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + + final Resource containerNeed = Resource.newInstance( + resourceRequest.memory, resourceRequest.vCores); + for (Map.Entry customResource : + resourceRequest.customResources.entrySet()) { + ResourceInformation resourceInformation = ResourceTypesTestHelper + .createResourceInformation(customResource.getKey(), customResource + .getValue()); + containerNeed.setResourceInformation(customResource.getKey(), + resourceInformation); + } + + if (earlierFailedAttempt) { + return ContainerRequestEvent + .createContainerRequestEventForFailedContainer(attemptId, + containerNeed); + } + return new ContainerRequestEvent(attemptId, containerNeed, hosts, + new String[]{NetworkTopology.DEFAULT_RACK}); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 7875917b68e..dfe7f71cb8a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; @@ -77,6 +78,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.TestResourceRequest; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -203,7 +205,7 @@ public void testSimple() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -215,12 +217,14 @@ public void testSimple() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest + (jobId, 1, TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest + (jobId, 2, TestResourceRequest.createWithMemory(1024), new String[] { "h2" }); allocator.sendRequest(event2); @@ -232,7 +236,8 @@ public void testSimple() throws Exception { Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest + (jobId, 3, TestResourceRequest.createWithMemory(1024), new String[] { "h3" }); allocator.sendRequest(event3); @@ -242,7 +247,7 @@ public void testSimple() throws Exception { rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); - + // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat @@ -254,19 +259,19 @@ public void testSimple() throws Exception { Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); - + // check that the assigned container requests are cancelled allocator.schedule(); rm.drainEvents(); Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } - - @Test + + @Test public void testMapNodeLocality() throws Exception { - // test checks that ordering of allocated containers list from the RM does - // not affect the map->container assignment done by the AM. If there is a - // node local container available for a map then it should be assigned to - // that container and not a rack-local container that happened to be seen + // test checks that ordering of allocated containers list from the RM does + // not affect the map->container assignment done by the AM. If there is a + // node local container available for a map then it should be assigned to + // that container and not a rack-local container that happened to be seen // earlier in the allocated containers list from the RM. // Regression test for MAPREDUCE-4893 LOG.info("Running testMapNodeLocality"); @@ -291,26 +296,29 @@ public void testMapNodeLocality() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); // add resources to scheduler - MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps + MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map rm.drainEvents(); // create the container requests for maps - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h1" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event2); - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h2" }); + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest( + jobId, 3, TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -323,7 +331,7 @@ public void testMapNodeLocality() throws Exception { // Node heartbeat from rack-local first. This makes node h3 the first in the // list of allocated containers but it should not be assigned to task1. nodeManager3.nodeHeartbeat(true); - // Node heartbeat from node-local next. This allocates 2 node local + // Node heartbeat from node-local next. This allocates 2 node local // containers for task1 and task2. These should be matched with those tasks. nodeManager1.nodeHeartbeat(true); rm.drainEvents(); @@ -381,12 +389,14 @@ public void testResource() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, TestResourceRequest.createWithMemory(1024), new String[] { "h1" }); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 2048, + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, TestResourceRequest.createWithMemory(1024), new String[] { "h2" }); allocator.sendRequest(event2); @@ -440,14 +450,18 @@ public void testReducerRampdownDiagnostics() throws Exception { // create the container request final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + allocator.sendRequest(createRequest(jobId, 0, + TestResourceRequest.createWithMemory(1024), + locations, false, true)); for (int i = 0; i < 1;) { rm.drainEvents(); i += allocator.schedule().size(); nm.nodeHeartbeat(true); } - allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); + allocator.sendRequest(createRequest(jobId, 0, + TestResourceRequest.createWithMemory(1024), + locations, true, false)); while (allocator.getTaskAttemptKillEvents().size() == 0) { rm.drainEvents(); allocator.schedule().size(); @@ -494,7 +508,9 @@ public void testPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + TestResourceRequest.createWithMemory(2048), + new String[] { "h1" }, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), new RMContainerRequestor.ContainerRequest(event1, null,null)); assignedRequests.reduces.put(mock(TaskAttemptId.class), @@ -547,9 +563,12 @@ public void testNonAggressivelyPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + TestResourceRequest.createWithMemory(2048), + new String[] { "h1" }, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -561,7 +580,7 @@ public void testNonAggressivelyPreemptReducers() throws Exception { clock.setTime(clock.getTime() + (preemptThreshold) * 1000); allocator.preemptReducesIfNeeded(); Assert.assertEquals("The reducer is not preeempted", 1, - assignedRequests.preemptionWaitingReduces.size()); + assignedRequests.preemptionWaitingReduces.size()); } @Test(timeout = 30000) @@ -608,9 +627,12 @@ public void testUnconditionalPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + TestResourceRequest.createWithMemory(2048), + new String[] { "h1" }, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -652,12 +674,16 @@ public void testExcessReduceContainerAssign() throws Exception { // request to allocate two reduce priority containers final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + allocator.sendRequest(createRequest(jobId, 0, + TestResourceRequest.createWithMemory(1024), + locations, false, true)); allocator.scheduleAllReduces(); allocator.makeRemoteRequest(); nm.nodeHeartbeat(true); rm.drainEvents(); - allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); + allocator.sendRequest(createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + locations, false, false)); int assignedContainer; for (assignedContainer = 0; assignedContainer < 1;) { @@ -684,7 +710,7 @@ public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId.newInstance(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -706,13 +732,16 @@ protected ApplicationMasterProtocol createSchedulerProxy() { // create some map requests ContainerRequestEvent reqMapEvents; - reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); + reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0, + TestResourceRequest.createWithMemory(1024), new String[]{"map"}); allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests ContainerRequestEvent reqReduceEvents; reqReduceEvents = - createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); + createRequest(jobId, 0, + TestResourceRequest.createWithMemory(2048), + new String[] { "reduce" }, false, true); allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); // verify all of the host-specific asks were sent plus one for the @@ -883,18 +912,21 @@ public void testMapReduceScheduling() throws Exception { // create the container request // send MAP request - ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { - "h1", "h2" }, true, false); + ContainerRequestEvent event1 = createRequest(jobId, 1, + TestResourceRequest.createWithMemory(2048), + new String[] { "h1", "h2" }, true, false); allocator.sendRequest(event1); // send REDUCE request - ContainerRequestEvent event2 = createReq(jobId, 2, 3000, - new String[] { "h1" }, false, true); + ContainerRequestEvent event2 = createRequest(jobId, 2, + TestResourceRequest.createWithMemory(3000), + new String[] { "h1" }, false, true); allocator.sendRequest(event2); // send MAP request - ContainerRequestEvent event3 = createReq(jobId, 3, 2048, - new String[] { "h3" }, false, false); + ContainerRequestEvent event3 = createRequest(jobId, 3, + TestResourceRequest.createWithMemory(2048), + new String[] { "h3" }, false, false); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -921,10 +953,10 @@ public void testMapReduceScheduling() throws Exception { } } - private static class MyResourceManager extends MockRM { + static class MyResourceManager extends MockRM { private static long fakeClusterTimeStamp = System.currentTimeMillis(); - + public MyResourceManager(Configuration conf) { super(conf); } @@ -955,7 +987,7 @@ public void handle(SchedulerEvent event) { protected ResourceScheduler createScheduler() { return new MyFifoScheduler(this.getRMContext()); } - + MyFifoScheduler getMyFifoScheduler() { return (MyFifoScheduler) scheduler; } @@ -1221,7 +1253,7 @@ protected ContainerAllocator createContainerAllocator( Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } - + @Test public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); @@ -1251,11 +1283,13 @@ public void testUpdatedNodes() throws Exception { rm.drainEvents(); // create the map container request - ContainerRequestEvent event = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event = + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[] { "h1" }); allocator.sendRequest(event); TaskAttemptId attemptId = event.getAttemptID(); - + TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); Task mockTask = mock(Task.class); @@ -1279,7 +1313,7 @@ public void testUpdatedNodes() throws Exception { // no updated nodes reported Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); - + // mark nodes bad nm1.nodeHeartbeat(false); nm2.nodeHeartbeat(false); @@ -1292,11 +1326,13 @@ public void testUpdatedNodes() throws Exception { // updated nodes are reported Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); - Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); - Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); + Assert.assertEquals(2, + allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); + Assert.assertEquals(attemptId, + allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); allocator.getJobUpdatedNodeEvents().clear(); allocator.getTaskAttemptKillEvents().clear(); - + assigned = allocator.schedule(); rm.drainEvents(); Assert.assertEquals(0, assigned.size()); @@ -1307,7 +1343,7 @@ public void testUpdatedNodes() throws Exception { @Test public void testBlackListedNodes() throws Exception { - + LOG.info("Running testBlackListedNodes"); Configuration conf = new Configuration(); @@ -1315,7 +1351,7 @@ public void testBlackListedNodes() throws Exception { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1331,7 +1367,7 @@ public void testBlackListedNodes() throws Exception { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1347,18 +1383,24 @@ public void testBlackListedNodes() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[] { "h1" }); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h2" }); + ContainerRequestEvent event2 = + ContainerRequestCreator.createRequest(jobId, 2, + TestResourceRequest.createWithMemory(1024), + new String[] { "h2" }); allocator.sendRequest(event2); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + TestResourceRequest.createWithMemory(1024), + new String[] { "h3" }); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -1368,9 +1410,9 @@ public void testBlackListedNodes() throws Exception { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Send events to blacklist nodes h1 and h2 - ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); allocator.sendFailure(f1); - ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); allocator.sendFailure(f2); // update resources in scheduler @@ -1392,23 +1434,23 @@ public void testBlackListedNodes() throws Exception { assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - + Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertTrue("Assigned container host not correct", "h3".equals(assig .getContainer().getNodeId().getHost())); } } - + @Test public void testIgnoreBlacklisting() throws Exception { LOG.info("Running testIgnoreBlacklisting"); @@ -1543,7 +1585,7 @@ public void testIgnoreBlacklisting() throws Exception { getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + // Assign on 5 more nodes - to re-enable blacklisting for (int i = 0; i < 5; i++) { nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); @@ -1576,7 +1618,8 @@ private MockNM registerNodeManager(int i, MyResourceManager rm) int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { ContainerRequestEvent reqEvent = - createReq(jobId, taskAttemptId, memory, hosts); + ContainerRequestCreator.createRequest(jobId, taskAttemptId, + TestResourceRequest.createWithMemory(memory), hosts); allocator.sendRequest(reqEvent); // Send the request to the RM @@ -1596,7 +1639,7 @@ private MockNM registerNodeManager(int i, MyResourceManager rm) expectedAdditions2, expectedRemovals2, rm); return assigned; } - + @Test public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); @@ -1606,7 +1649,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1622,7 +1665,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1638,8 +1681,10 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Requesting 1 Containers _1 on H1"); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[] { "h1" }); allocator.sendRequest(event1); LOG.info("RM Heartbeat (to send the container requests)"); @@ -1653,13 +1698,13 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + LOG.info("Failing container _1 on H1 (should blacklist the node)"); // Send events to blacklist nodes h1 and h2 ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); @@ -1667,8 +1712,9 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { //At this stage, a request should be created for a fast fail map //Create a FAST_FAIL request for a previously failed map. - ContainerRequestEvent event1f = createReq(jobId, 1, 1024, - new String[] { "h1" }, true, false); + ContainerRequestEvent event1f = createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[] { "h1" }, true, false); allocator.sendRequest(event1f); //Update the Scheduler with the new requests. @@ -1678,24 +1724,26 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h1", "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + TestResourceRequest.createWithMemory(1024), + new String[] { "h1", "h3" }); allocator.sendRequest(event3); - + //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. //RM is only aware of the prio:5 container - + LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + //RMContainerAllocator gets assigned a p:5 on a blacklisted node. //Send a release for the p:5 container + another request. @@ -1704,26 +1752,26 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + //Hearbeat from H3 to schedule on this host. LOG.info("h3 Heartbeat (To re-schedule the containers)"); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); assertBlacklistAdditionsAndRemovals(0, 0, rm); rm.drainEvents(); - + // For debugging for (TaskAttemptContainerAssignedEvent assig : assigned) { LOG.info(assig.getTaskAttemptID() + " assgined to " + assig.getContainer().getId() + " with priority " + assig.getContainer().getPriority()); } - + Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertEquals("Assigned container " + assig.getContainer().getId() @@ -1759,13 +1807,13 @@ public MyFifoScheduler(RMContext rmContext) { assert (false); } } - + List lastAsk = null; List lastRelease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; Resource forceResourceLimit = null; - + // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @Override @@ -1855,38 +1903,6 @@ public synchronized Allocation allocate( } } - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, String[] hosts) { - return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { - return createReq(jobId, taskAttemptId, mem, - 1, hosts, earlierFailedAttempt, reduce); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, - boolean reduce) { - TaskId taskId; - if (reduce) { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); - } else { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); - } - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, - taskAttemptId); - Resource containerNeed = Resource.newInstance(memory, vcore); - if (earlierFailedAttempt) { - return ContainerRequestEvent - .createContainerRequestEventForFailedContainer(attemptId, - containerNeed); - } - return new ContainerRequestEvent(attemptId, containerNeed, hosts, - new String[] { NetworkTopology.DEFAULT_RACK }); - } - private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, String host, boolean reduce) { TaskId taskId; @@ -1897,9 +1913,9 @@ private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); - return new ContainerFailedEvent(attemptId, host); + return new ContainerFailedEvent(attemptId, host); } - + private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, int taskAttemptId, boolean reduce) { TaskId taskId; @@ -1957,12 +1973,12 @@ private void checkAssignment(ContainerRequestEvent request, // Mock RMContainerAllocator // Instead of talking to remote Scheduler,uses the local Scheduler - private static class MyContainerAllocator extends RMContainerAllocator { + static class MyContainerAllocator extends RMContainerAllocator { static final List events = new ArrayList(); - static final List taskAttemptKillEvents + static final List taskAttemptKillEvents = new ArrayList(); - static final List jobUpdatedNodeEvents + static final List jobUpdatedNodeEvents = new ArrayList(); static final List jobEvents = new ArrayList(); private MyResourceManager rm; @@ -2081,7 +2097,7 @@ public void sendRequests(List reqs) { public void sendFailure(ContainerFailedEvent f) { super.handleEvent(f); } - + public void sendDeallocate(ContainerAllocatorEvent f) { super.handleEvent(f); } @@ -2104,11 +2120,11 @@ public Boolean get() { events.clear(); return result; } - + static List getTaskAttemptKillEvents() { return taskAttemptKillEvents; } - + static List getJobUpdatedNodeEvents() { return jobUpdatedNodeEvents; } @@ -2117,12 +2133,12 @@ public Boolean get() { protected void startAllocatorThread() { // override to NOT start thread } - + @Override protected boolean isApplicationMasterRegistered() { return super.isApplicationMasterRegistered(); } - + public boolean isUnregistered() { return isUnregistered; } @@ -2164,7 +2180,7 @@ public void testReduceScheduling() throws Exception { int numPendingReduces = 4; float maxReduceRampupLimit = 0.5f; float reduceSlowStart = 0.2f; - + RMContainerAllocator allocator = mock(RMContainerAllocator.class); doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class), @@ -2174,14 +2190,14 @@ public void testReduceScheduling() throws Exception { // Test slow-start allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); - + // verify slow-start still in effect when no more maps need to // be scheduled but some have yet to complete allocator.scheduleReduces( @@ -2197,23 +2213,23 @@ public void testReduceScheduling() throws Exception { succeededMaps = 3; doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(1)).setIsReduceStarted(true); - + // Test reduce ramp-up doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampUpReduces(anyInt()); verify(allocator, never()).rampDownReduces(anyInt()); @@ -2232,18 +2248,18 @@ public void testReduceScheduling() throws Exception { verify(allocator).rampDownReduces(anyInt()); // Test reduce ramp-down for when there are scheduled maps - // Since we have two scheduled Maps, rampDownReducers + // Since we have two scheduled Maps, rampDownReducers // should be invoked twice. scheduledMaps = 2; assignedReduces = 2; doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(2)).rampDownReduces(anyInt()); @@ -2288,7 +2304,7 @@ public void scheduleReduces(int totalMaps, int completedMaps, recalculatedReduceSchedule = true; } } - + @Test public void testCompletedTasksRecalculateSchedule() throws Exception { LOG.info("Running testCompletedTasksRecalculateSchedule"); @@ -2400,31 +2416,33 @@ public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), mock(AppContext.class), new NoopAMPreemptionPolicy()); - + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1); - ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( - applicationId, 1); - ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 1); ContainerStatus status = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", 0); ContainerStatus abortedStatus = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", ContainerExitStatus.ABORTED); - + TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, event.getType()); - + TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); - - ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2); + + ContainerId containerId2 = + ContainerId.newContainerId(applicationAttemptId, 2); ContainerStatus status2 = ContainerStatus.newInstance(containerId2, ContainerState.RUNNING, "", 0); @@ -2440,7 +2458,7 @@ public void testCompletedContainerEvent() { preemptedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); } - + @Test public void testUnregistrationOnlyIfRegistered() throws Exception { Configuration conf = new Configuration(); @@ -2483,7 +2501,7 @@ protected ContainerAllocator createContainerAllocator( mrApp.stop(); Assert.assertTrue(allocator.isUnregistered()); } - + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 // blackListeNode // Step-2 : 2 containers are allocated by RM. @@ -2542,11 +2560,15 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // create the container request // send MAP request ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + TestResourceRequest.createWithMemory(2048), + new String[] { "h1", "h2" }); allocator.sendRequest(event2); // Send events to blacklist h2 @@ -2584,7 +2606,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // RM // send container request ContainerRequestEvent event3 = - createReq(jobId, 3, 1000, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 3, + TestResourceRequest.createWithMemory(1000), + new String[]{"h1"}); allocator.sendRequest(event3); // send deallocate request @@ -2628,7 +2652,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() allocator.sendFailure(f2); ContainerRequestEvent event4 = - createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 4, + TestResourceRequest.createWithMemory(2000), + new String[]{"h1", "h2"}); allocator.sendRequest(event4); // send allocate request to 2nd RM and get resync command @@ -2639,7 +2665,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // asks,release,blacklistAaddition // and another containerRequest(event5) ContainerRequestEvent event5 = - createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); + ContainerRequestCreator.createRequest(jobId, 5, + TestResourceRequest.createWithMemory(3000), + new String[]{"h1", "h2", "h3"}); allocator.sendRequest(event5); // send all outstanding request again. @@ -2696,10 +2724,10 @@ protected Resource getMaxContainerCapability() { } }; - ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), - new String[0], false, false); + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0, + TestResourceRequest.create(memory, maxContainerSupported.getVirtualCores()), + new String[0], false, false); allocator.sendRequests(Arrays.asList(mapRequestEvt)); allocator.schedule(); @@ -2734,10 +2762,11 @@ protected Resource getMaxContainerCapability() { } }; - ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), - new String[0], false, true); + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0, + TestResourceRequest.create(memory, + maxContainerSupported.getVirtualCores()), + new String[0], false, true); allocator.sendRequests(Arrays.asList(reduceRequestEvt)); // Reducer container requests are added to the pending queue upon request, // schedule all reducers here so that we can observe if reducer requests @@ -2787,8 +2816,9 @@ public void testRMUnavailable() rm1.drainEvents(); Assert.assertEquals("Should Have 1 Job Event", 1, allocator.jobEvents.size()); - JobEvent event = allocator.jobEvents.get(0); - Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); + JobEvent event = allocator.jobEvents.get(0); + Assert.assertTrue("Should Reboot", + event.getType().equals(JobEventType.JOB_AM_REBOOT)); } @Test(timeout=60000) @@ -2920,7 +2950,9 @@ protected void setRequestLimit(Priority priority, // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + TestResourceRequest.createWithMemory(1024), + new String[] { "h" + i }); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests @@ -2928,7 +2960,8 @@ protected void setRequestLimit(Priority priority, new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { reqReduceEvents[i] = - createReq(jobId, i, 1024, new String[] {}, false, true); + createRequest(jobId, i, TestResourceRequest.createWithMemory(1024), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -2975,14 +3008,17 @@ protected ApplicationMasterProtocol createSchedulerProxy() { // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + TestResourceRequest.createWithMemory(1024), new String[] { "h" + i }); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests - ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; + ContainerRequestEvent[] reqReduceEvents = + new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { - reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, - false, true); + reqReduceEvents[i] = + createRequest(jobId, i, TestResourceRequest.createWithMemory(1024), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -3137,13 +3173,19 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, + TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3156,7 +3198,8 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, TestResourceRequest.createWithMemory(1024), + new String[] { "h3" }, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3301,13 +3344,18 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3320,7 +3368,8 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, TestResourceRequest.createWithMemory(1024), + new String[]{"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3433,13 +3482,19 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + TestResourceRequest.createWithMemory(1024), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); + createRequest(jobId, 3, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3449,7 +3504,8 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, TestResourceRequest.createWithMemory(1024), + new String[] { "h3" }, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3486,7 +3542,9 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Send request for one more mapper. ContainerRequestEvent event5 = - createReq(jobId, 5, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 5, + TestResourceRequest.createWithMemory(1024), + new String[]{"h1"}); allocator.sendRequest(event5); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java index 7a212e163d9..440ed9fb48a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -175,16 +175,8 @@ public static long convert(String fromUnit, String toUnit, long fromValue) { */ public static int compare(String unitA, long valueA, String unitB, long valueB) { - if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) - || !KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Units cannot be null"); - } - if (!KNOWN_UNITS.contains(unitA)) { - throw new IllegalArgumentException("Unknown unit '" + unitA + "'"); - } - if (!KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); - } + checkUnitArgument(unitA); + checkUnitArgument(unitB); if (unitA.equals(unitB)) { return Long.compare(valueA, valueB); } @@ -218,4 +210,37 @@ public static int compare(String unitA, long valueA, String unitB, return tmpA.compareTo(tmpB); } } + + private static void checkUnitArgument(String unit) { + if (unit == null || !KNOWN_UNITS.contains(unit)) { + throw new IllegalArgumentException("Unit cannot be null"); + } + if (!KNOWN_UNITS.contains(unit)) { + throw new IllegalArgumentException("Unknown unit '" + unit + "'"); + } + } + + /** + * Compare a unit to another unit. + *
+ * Examples:
+ * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k") + * will return -1.
+ * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will + * return 1. + * + * @param unitA first unit + * @param unitB second unit + * @return +1, 0 or -1 depending on whether the relationship between units + * is smaller than, + * equal to or lesser than. + */ + public static int compareUnits(String unitA, String unitB) { + checkUnitArgument(unitA); + checkUnitArgument(unitB); + int unitAPos = SORTED_UNITS.indexOf(unitA); + int unitBPos = SORTED_UNITS.indexOf(unitB); + + return Integer.compare(unitAPos, unitBPos); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java new file mode 100644 index 00000000000..3bb077baabe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java @@ -0,0 +1,69 @@ +package org.apache.hadoop.yarn.resourcetypes; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ResourceTypesTestHelper { + + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private static class ResourceValueAndUnit { + private final Long value; + private final String unit; + + private ResourceValueAndUnit(Long value, String unit) { + this.value = value; + this.unit = unit; + } + } + + public static Resource newResource(long memory, int vCores, Map customResources) { + Resource resource = recordFactory.newRecordInstance(Resource.class); + resource.setMemorySize(memory); + resource.setVirtualCores(vCores); + + for (Map.Entry customResource : + customResources.entrySet()) { + String resourceName = customResource.getKey(); + ResourceInformation resourceInformation = + createResourceInformation(resourceName, + customResource.getValue()); + resource.setResourceInformation(resourceName, resourceInformation); + } + return resource; + } + + public static ResourceInformation createResourceInformation(String + resourceName, String descriptor) { + ResourceValueAndUnit resourceValueAndUnit = + getResourceValueAndUnit(descriptor); + return ResourceInformation + .newInstance(resourceName, resourceValueAndUnit.unit, + resourceValueAndUnit.value); + } + + private static ResourceValueAndUnit getResourceValueAndUnit(String val) { + String patternStr = "(\\d+)([A-za-z]*)"; + final Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(val); + if (!matcher.find()) { + throw new RuntimeException("Invalid pattern of resource descriptor: " + + val); + } else if (matcher.groupCount() != 2) { + throw new RuntimeException("Capturing group count in string " + + val + " is not 2!"); + } + long value = Long.parseLong(matcher.group(1)); + + return new ResourceValueAndUnit(value, matcher.group(2)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 4ba1bdfd1ec..bbd4e62b84b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -183,7 +183,7 @@ public static ContainerId newContainerId(RecordFactory recordFactory, public static NodeId newNodeId(String host, int port) { return NodeId.newInstance(host, port); } - + public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { @@ -421,7 +421,7 @@ public static ApplicationReport newApplicationReport( report.setPriority(priority); return report; } - + public static ApplicationSubmissionContext newApplicationSubmissionContext( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, @@ -476,6 +476,10 @@ public static Resource newResource(long memory, int vCores) { return resource; } + public static Resource newEmptyResource() { + return recordFactory.newRecordInstance(Resource.class); + } + public static URL newURL(String scheme, String host, int port, String file) { URL url = recordFactory.newRecordInstance(URL.class); url.setScheme(scheme); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 27563d68c84..8244931fcfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -268,24 +269,10 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq, private static void validateResourceRequest(ResourceRequest resReq, Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) throws InvalidResourceRequestException { - Resource requestedResource = resReq.getCapability(); - for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { - ResourceInformation reqRI = requestedResource.getResourceInformation(i); - ResourceInformation maxRI = maximumResource.getResourceInformation(i); - if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) { - throw new InvalidResourceRequestException( - "Invalid resource request, requested resource type=[" + reqRI - .getName() - + "] < 0 or greater than maximum allowed allocation. Requested " - + "resource=" + requestedResource - + ", maximum allowed allocation=" + maximumResource - + ", please note that maximum allowed allocation is calculated " - + "by scheduler based on maximum resource of registered " - + "NodeManagers, which might be less than configured " - + "maximum allocation=" + ResourceUtils - .getResourceTypesMaximumAllocation()); - } - } + final Resource requestedResource = resReq.getCapability(); + checkResourceRequestAgainstAvailableResource(requestedResource, + maximumResource); + String labelExp = resReq.getNodeLabelExpression(); // we don't allow specify label expression other than resourceName=ANY now if (!ResourceRequest.ANY.equals(resReq.getResourceName()) @@ -323,6 +310,74 @@ private static void validateResourceRequest(ResourceRequest resReq, } } + @Private + @VisibleForTesting + static void checkResourceRequestAgainstAvailableResource(Resource reqResource, + Resource availableResource) throws InvalidResourceRequestException { + for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + final ResourceInformation requestedRI = + reqResource.getResourceInformation(i); + final String reqResourceName = requestedRI.getName(); + + if (requestedRI.getValue() < 0) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + + final ResourceInformation availableRI = + availableResource.getResourceInformation(reqResourceName); + + long requestedResourceValue = requestedRI.getValue(); + long availableResourceValue = availableRI.getValue(); + int unitsRelation = UnitsConversionUtil + .compareUnits(requestedRI.getUnits(), availableRI.getUnits()); + + LOG.debug("Requested resource information: " + requestedRI); + LOG.debug("Available resource information: " + availableRI); + LOG.debug("Relation of units: " + unitsRelation); + + // requested resource unit is less than available resource unit + // e.g. requestedUnit: "m", availableUnit: "K") + if (unitsRelation < 0) { + availableResourceValue = + UnitsConversionUtil.convert(availableRI.getUnits(), + requestedRI.getUnits(), availableRI.getValue()); + + // requested resource unit is greater than available resource unit + // e.g. requestedUnit: "G", availableUnit: "M") + } else if (unitsRelation > 0) { + requestedResourceValue = + UnitsConversionUtil.convert(requestedRI.getUnits(), + availableRI.getUnits(), requestedRI.getValue()); + } + + LOG.debug("Requested resource value after conversion: " + + requestedResourceValue); + LOG.info("Available resource value after conversion: " + + availableResourceValue); + + if (requestedResourceValue > availableResourceValue) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + } + } + + private static void throwInvalidResourceException( + Resource reqResource, Resource availableResource, String reqResourceName) + throws InvalidResourceRequestException { + throw new InvalidResourceRequestException( + "Invalid resource request, requested resource type=[" + reqResourceName + + "] < 0 or greater than maximum allowed allocation. Requested " + + "resource=" + reqResource + ", maximum allowed allocation=" + + availableResource + + ", please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of registered " + + "NodeManagers, which might be less than configured " + + "maximum allocation=" + + ResourceUtils.getResourceTypesMaximumAllocation()); + } + private static void checkQueueLabelInLabelManager(String labelExpression, RMContext rmContext) throws InvalidLabelResourceRequestException { // check node label manager contains this label diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 90e4be83d08..d348b988be0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -22,9 +22,13 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; + +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -61,6 +65,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -75,6 +80,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -365,7 +373,7 @@ public void testInvalidContainerReleaseRequest() throws Exception { am2.addContainerToBeReleased(cId); try { am2.schedule(); - Assert.fail("Exception was expected!!"); + fail("Exception was expected!!"); } catch (InvalidContainerReleaseException e) { StringBuilder sb = new StringBuilder("Cannot release container : "); sb.append(cId.toString()); @@ -460,7 +468,7 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { FinalApplicationStatus.FAILED, "", ""); try { am1.unregisterAppAttempt(req, false); - Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + fail("ApplicationMasterNotRegisteredException should be thrown"); } catch (ApplicationMasterNotRegisteredException e) { Assert.assertNotNull(e); Assert.assertNotNull(e.getMessage()); @@ -468,7 +476,7 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { "Application Master is trying to unregister before registering for:" )); } catch (Exception e) { - Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + fail("ApplicationMasterNotRegisteredException should be thrown"); } am1.registerAppAttempt(); @@ -627,9 +635,7 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", response.getUpdateErrors().get(0).getReason()); } finally { - if (rm != null) { - rm.close(); - } + rm.close(); } } @@ -709,11 +715,22 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); - CapacitySchedulerConfiguration csconf = - new CapacitySchedulerConfiguration(); - csconf.setResourceComparator(DominantResourceCalculator.class); - - YarnConfiguration conf = new YarnConfiguration(csconf); + final YarnConfiguration conf; + if (schedulerCls.getCanonicalName() + .equals(CapacityScheduler.class.getCanonicalName())) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + conf = new YarnConfiguration(csConf); + } else if (schedulerCls.getCanonicalName() + .equals(FairScheduler.class.getCanonicalName())) { + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + conf = new YarnConfiguration(fsConf); + } else { + throw new IllegalStateException( + "Scheduler class is of wrong type: " + schedulerCls); + } + // Don't reset resource types since we have already configured resource // types conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); @@ -728,15 +745,17 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); - RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request resource, memory > allowed boolean exception = false; try { - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*") - .build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(9 * GB, 1)) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -744,10 +763,12 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul exception = false; try { - // Now request resource, vcore > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*") - .build()), null); + // Now request resource, vcores > allowed + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(8 * GB, 18)) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -756,6 +777,72 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul rm.close(); } + @Test + public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits() + throws Exception { + + // Initialize resource map for 2 types. + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + ResourceInformation memory = + ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = + ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceInformation res_1 = + ResourceInformation.newInstance("res_1", "G", 0, 4); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put("res_1", res_1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + FairSchedulerConfiguration fsConf = + new FairSchedulerConfiguration(); + + YarnConfiguration conf = new YarnConfiguration(fsConf); + // Don't reset resource types since we have already configured resource + // types + conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("199.99.99.1:1234", + ResourceTypesTestHelper.newResource( + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap. builder() + .put("res_1", "5G").build())); + + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Now request res_1, 500M < 5G so it should be allowed + try { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(ResourceTypesTestHelper.newResource(4 * GB, 1, + ImmutableMap. builder() + .put("res_1", "500M") + .build())) + .numContainers(1).resourceName("*").build()), null); + } catch (InvalidResourceRequestException e) { + fail( + "Allocate request should be accepted but exception was thrown: " + e); + } + + rm.close(); + } + @Test(timeout = 300000) public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes() throws Exception { @@ -805,18 +892,21 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ImmutableMap.of("res_1", 4))); - RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - Assert.assertEquals(Resource.newInstance(1 * GB, 1), + Assert.assertEquals(Resource.newInstance(GB, 1), leafQueue.getUsedResources()); // Now request resource, memory > allowed boolean exception = false; try { - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1))) - .numContainers(1).resourceName("*").build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(9 * GB, 1, + ImmutableMap.of("res_1", 1))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -824,11 +914,13 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType exception = false; try { - // Now request resource, vcore > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) - .numContainers(1).resourceName("*") - .build()), null); + // Now request resource, vcores > allowed + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability( + TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -837,10 +929,12 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType exception = false; try { // Now request resource, res_1 > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100))) - .numContainers(1).resourceName("*") - .build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(8 * GB, 1, + ImmutableMap.of("res_1", 100))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -856,7 +950,7 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); } else { - Assert.fail("Cannot find RMContainer"); + fail("Cannot find RMContainer"); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index cb1f794190b..9cebc633c6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -26,7 +26,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +45,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -63,8 +67,10 @@ import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -83,20 +89,79 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.junit.rules.ExpectedException; public class TestSchedulerUtils { private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); - + public static Resource CONFIGURED_MAX_ALLOCATION; + + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " custom-resource-1," + + "custom-resource-2,custom-resource-3\n" + + " \n" + + " \n" + + " yarn.resource-types" + + ".custom-resource-1.units\n" + + " G\n" + + " \n" + + " \n" + + " yarn.resource-types" + + ".custom-resource-2.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } private RMContext rmContext = getMockRMContext(); + private static YarnConfiguration conf = new YarnConfiguration(); + @Rule + public ExpectedException exception = ExpectedException.none(); + + private void initResourceTypes() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(conf); + } + + @Before + public void setUp() { + initResourceTypes(); + //this needs to be initialized after initResourceTypes is called + CONFIGURED_MAX_ALLOCATION = Resource.newInstance(8192, 4, + ImmutableMap.builder() + .put("custom-resource-1", Long.MAX_VALUE) + .put("custom-resource-2", Long.MAX_VALUE) + .put("custom-resource-3", Long.MAX_VALUE) + .build()); + } + @Test (timeout = 30000) public void testNormalizeRequest() { ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @@ -363,7 +428,7 @@ public void testValidateResourceRequestWithErrorLabelsPermission() rmContext.getNodeLabelManager().removeFromClusterNodeLabels( Arrays.asList("x")); } - Assert.assertTrue("InvalidLabelResourceRequestException excpeted", + Assert.assertTrue("InvalidLabelResourceRequestException expected", invalidlabelexception); // queue is "*", always succeeded try { @@ -775,6 +840,131 @@ public void testNormalizeNodeLabelExpression() } } + @Test + public void testAcceptOfCustomResourceRequestedUnitIsSmallerThanAvailableUnit() + throws InvalidResourceRequestException { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "11").build()); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "0G").build()); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withConfiguredMaximumAllocation(CONFIGURED_MAX_ALLOCATION).build()); + + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + + @Test + public void testAcceptOfCustomResourceRequestedUnitIsSmallerThanAvailableUnit2() { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "11").build()); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "1G").build()); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testAcceptOfCustomResourceRequestedUnitIsGreaterThanAvailableUnit() + throws InvalidResourceRequestException { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "1M").build()); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "120k") + .build()); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withConfiguredMaximumAllocation(CONFIGURED_MAX_ALLOCATION).build()); + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + + @Test + public void testAcceptOfCustomResourceRequestedUnitIsGreaterThanAvailableUnit2() { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "11M") + .build()); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, ImmutableMap + . builder().put("custom-resource-1", "1G").build()); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testAcceptOfCustomResourceRequestedUnitIsSameAsAvailableUnit() { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "11M") + .build()); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "100M") + .build()); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testAcceptOfCustomResourceRequestedUnitIsSameAsAvailableUnit2() + throws InvalidResourceRequestException { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "110M") + .build()); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "100M") + .build()); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withConfiguredMaximumAllocation(CONFIGURED_MAX_ALLOCATION).build()); + + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + public static void waitSchedulerApplicationAttemptStopped( AbstractYarnScheduler ys, ApplicationAttemptId attemptId) throws InterruptedException { @@ -832,4 +1022,61 @@ private static RMContext getMockRMContext() { when(rmContext.getNodeLabelManager()).thenReturn(nlm); return rmContext; } + + private static class InvalidResourceRequestExceptionMessageGenerator { + + private StringBuilder sb; + private Resource requestedResource; + private Resource availableAllocation; + private Resource configuredMaxAllowedAllocation; + private String resourceType; + + InvalidResourceRequestExceptionMessageGenerator(StringBuilder + sb) { + this.sb = sb; + } + + public static InvalidResourceRequestExceptionMessageGenerator create() { + return new InvalidResourceRequestExceptionMessageGenerator( + new StringBuilder()); + } + + InvalidResourceRequestExceptionMessageGenerator withRequestedResource( + Resource r) { + this.requestedResource = r; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withRequestedResourceType( + String resourceType) { + this.resourceType = resourceType; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withAvailableAllocation( + Resource r) { + this.availableAllocation = r; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withConfiguredMaximumAllocation( + Resource r) { + this.configuredMaxAllowedAllocation = r; + return this; + } + + public String build() { + return sb + .append("Invalid resource request, requested resource type=[") + .append(resourceType).append("]") + .append(" < 0 or greater than maximum allowed allocation. ") + .append("Requested resource=").append(requestedResource).append(", ") + .append("maximum allowed allocation=").append(availableAllocation) + .append(", please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of " + + "registered NodeManagers, which might be less than " + + "configured maximum allocation=") + .append(configuredMaxAllowedAllocation).toString(); + } + } }