diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 294e621..868ada9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -33,8 +33,9 @@ public int compare(Resource unused, Resource lhs, Resource rhs) { @Override public int computeAvailableContainers(Resource available, Resource required) { - // Only consider memory - return available.getMemory() / required.getMemory(); + return Math.min( + available.getMemory() / required.getMemory(), + available.getVirtualCores() / required.getVirtualCores()); } @Override @@ -45,13 +46,19 @@ public float divide(Resource unused, @Override public float ratio(Resource a, Resource b) { - return (float)a.getMemory() / b.getMemory(); + return Math.max( + (float)a.getMemory()/b.getMemory(), + (float)a.getVirtualCores()/b.getVirtualCores() + ); + } @Override public Resource divideAndCeil(Resource numerator, int denominator) { return Resources.createResource( - divideAndCeil(numerator.getMemory(), denominator)); + divideAndCeil(numerator.getMemory(), denominator), + divideAndCeil(numerator.getVirtualCores(), denominator) + ); } @Override @@ -59,10 +66,16 @@ public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { int normalizedMemory = Math.min( roundUp( - Math.max(r.getMemory(), minimumResource.getMemory()), - stepFactor.getMemory()), - maximumResource.getMemory()); - return Resources.createResource(normalizedMemory); + Math.max(r.getMemory(), minimumResource.getMemory()), + stepFactor.getMemory()), + maximumResource.getMemory()); + int normalizedCores = Math.min( + roundUp( + Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), + stepFactor.getVirtualCores()), + maximumResource.getVirtualCores()); + return Resources.createResource(normalizedMemory, + normalizedCores); } @Override @@ -74,21 +87,28 @@ public Resource normalize(Resource r, Resource minimumResource, @Override public Resource roundUp(Resource r, Resource stepFactor) { return Resources.createResource( - roundUp(r.getMemory(), stepFactor.getMemory()) + roundUp(r.getMemory(), stepFactor.getMemory()), + roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()) ); } @Override public Resource roundDown(Resource r, Resource stepFactor) { return Resources.createResource( - roundDown(r.getMemory(), stepFactor.getMemory())); + roundDown(r.getMemory(), stepFactor.getMemory()), + roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()) + ); } @Override public Resource multiplyAndNormalizeUp(Resource r, double by, Resource stepFactor) { return Resources.createResource( - roundUp((int)(r.getMemory() * by + 0.5), stepFactor.getMemory()) + roundUp( + (int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()), + roundUp( + (int)Math.ceil(r.getVirtualCores() * by), + stepFactor.getVirtualCores()) ); } @@ -99,6 +119,10 @@ public Resource multiplyAndNormalizeDown(Resource r, double by, roundDown( (int)(r.getMemory() * by), stepFactor.getMemory() + ), + roundDown( + (int)(r.getVirtualCores() * by), + stepFactor.getVirtualCores() ) ); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 86e1b1e..2b2e8f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -246,4 +246,77 @@ protected RMSecretManagerService createRMSecretManagerService() { rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); MockRM.launchAndRegisterAM(app1, rm1, nm1); } + + @Test(timeout = 3000000) + public void testContainerRequestBeyondAvailableVcores() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 3 * GB, 2); + MockNM nm2 = rm.registerNode("127.0.0.1:2234", 1 * GB, 1); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(2, rm.getRMContext().getRMNodes().size()); + + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + LOG.info("sending container requests "); + am1.addRequests(new String[] {"*"}, 1 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + Assert.assertEquals(1, alloc1Response.getAllocatedContainers().size()); + + // kick the scheduling + LOG.info("sending container requests "); + am1.addRequests(new String[] {"*"}, 1 * GB, 1, 1); + alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + + // No container should be allocated. + Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size()); + + rm.stop(); + rm.close(); + } } \ No newline at end of file