diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 8b1b709..00109ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -36,6 +36,7 @@ * *

* @@ -72,6 +73,19 @@ public void setMaximumResourceCapability(Resource capability); /** + * Get the increment capability for any {@link Resource} allocated by the + * ResourceManager in the cluster. + * @return incremente capability of allocated resources in the cluster + */ + @Public + @Stable + public Resource getIncrementResourceCapability(); + + @Private + @Unstable + public void setIncrementResourceCapability(Resource capability); + + /** * Get the ApplicationACLs for the application. * @return all the ApplicationACLs */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index b861282..f0f4acd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -46,6 +46,7 @@ private Resource minimumResourceCapability; private Resource maximumResourceCapability; + private Resource incrementResourceCapability; private Map applicationACLS = null; public RegisterApplicationMasterResponsePBImpl() { @@ -79,7 +80,11 @@ private void mergeLocalToBuilder() { } if (this.maximumResourceCapability != null) { builder.setMaximumCapability( - convertToProtoFormat(this.maximumResourceCapability)); + convertToProtoFormat(this.maximumResourceCapability)); + } + if (this.incrementResourceCapability != null) { + builder.setIncrementCapability( + convertToProtoFormat(this.incrementResourceCapability)); } if (this.applicationACLS != null) { addApplicationACLs(); @@ -125,6 +130,22 @@ public Resource getMinimumResourceCapability() { } @Override + public Resource getIncrementResourceCapability() { + if (this.incrementResourceCapability != null) { + return this.incrementResourceCapability; + } + + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasIncrementCapability()) { + return null; + } + + this.incrementResourceCapability + = convertFromProtoFormat(p.getIncrementCapability()); + return this.incrementResourceCapability; + } + + @Override public void setMaximumResourceCapability(Resource capability) { maybeInitBuilder(); if(maximumResourceCapability == null) { @@ -142,6 +163,14 @@ public void setMinimumResourceCapability(Resource capability) { this.minimumResourceCapability = capability; } + @Override + public void setIncrementResourceCapability(Resource capability) { + maybeInitBuilder(); + if(incrementResourceCapability == null) { + builder.clearIncrementCapability(); + } + this.incrementResourceCapability = capability; + } @Override public Map getApplicationACLs() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fa1e740..6f9b2b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -123,6 +123,12 @@ YARN_PREFIX + "scheduler.maximum-allocation-vcores"; public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4; + /** Increment request grant-able by the RM scheduler. */ + public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB = + YARN_PREFIX + "scheduler.increment-allocation-mb"; + public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = + YARN_PREFIX + "scheduler.increment-allocation-vcores"; + /** Number of threads to handle scheduler interface.*/ public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT = RM_PREFIX + "scheduler.client.thread-count"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index ed3f871..7bf9a3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -38,6 +38,7 @@ message RegisterApplicationMasterResponseProto { optional ResourceProto minimumCapability = 1; optional ResourceProto maximumCapability = 2; repeated ApplicationACLMapProto application_ACLs = 3; + optional ResourceProto incrementCapability = 4; } message FinishApplicationMasterRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java new file mode 100644 index 0000000..0cb97ee --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestMinMaxIncrResourcesWithFS { + static Configuration conf = null; + static MiniYARNCluster yarnCluster = null; + static YarnClientImpl yarnClient = null; + static ApplicationAttemptId attemptId = null; + static int nodeCount = 3; + + @BeforeClass + public static void setup() throws Exception { + // start minicluster + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 2); + conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 3); + yarnCluster = new MiniYARNCluster(TestMinMaxIncrResourcesWithFS.class.getName(), + nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + yarnClient = new YarnClientImpl(); + yarnClient.init(conf); + yarnClient.start(); + + } + + @Before + public void startApp() throws Exception { + // submit new app + GetNewApplicationResponse newApp = yarnClient.getNewApplication(); + ApplicationId appId = newApp.getApplicationId(); + + ApplicationSubmissionContext appContext = Records + .newRecord(ApplicationSubmissionContext.class); + // set the application id + appContext.setApplicationId(appId); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + // unmanaged AM + appContext.setUnmanagedAM(true); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + break; + } + } + } + + @After + public void cancelApp() { + attemptId = null; + } + + @AfterClass + public static void tearDown() { + if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { + yarnClient.stop(); + } + if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { + yarnCluster.stop(); + } + } + + + @Test (timeout=60000) + public void testMinMaxIncrResources() throws YarnRemoteException, IOException { + AMRMClientImpl amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl(attemptId); + amClient.init(conf); + amClient.start(); + + RegisterApplicationMasterResponse response = + amClient.registerApplicationMaster("Host", 10000, ""); + + int minMem = response.getMinimumResourceCapability().getMemory(); + int maxMem = response.getMaximumResourceCapability().getMemory(); + int incrMem = response.getIncrementResourceCapability().getMemory(); + int minCPUs = response.getMinimumResourceCapability().getVirtualCores(); + int maxCPUs = response.getMaximumResourceCapability().getVirtualCores(); + int incrCPUs = response.getIncrementResourceCapability().getVirtualCores(); + assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + minMem); + assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + maxMem); + assertEquals(2, incrMem); + assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + minCPUs); + assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + maxCPUs); + assertEquals(3, incrCPUs); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java index 64c7ca7..b946dce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java @@ -24,15 +24,18 @@ public class ClusterInfo { private Resource minContainerCapability; private Resource maxContainerCapability; + private Resource incrContainerCapacity; public ClusterInfo() { this.minContainerCapability = Records.newRecord(Resource.class); this.maxContainerCapability = Records.newRecord(Resource.class); + this.incrContainerCapacity = Records.newRecord(Resource.class); } public ClusterInfo(Resource minCapability, Resource maxCapability) { this.minContainerCapability = minCapability; this.maxContainerCapability = maxCapability; + this.incrContainerCapacity = minCapability; } public Resource getMinContainerCapability() { @@ -50,4 +53,13 @@ public Resource getMaxContainerCapability() { public void setMaxContainerCapability(Resource maxContainerCapability) { this.maxContainerCapability = maxContainerCapability; } + + public Resource getIncrContainerCapability() { + return incrContainerCapacity; + } + + public void setIncrContainerCapability(Resource incrContainerCapacity) { + this.incrContainerCapacity = incrContainerCapacity; + } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index d71d193..e584b44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -228,7 +228,7 @@ in terms of virtual CPU cores. Requests higher than this won't take effect, and will get capped to this value. yarn.scheduler.maximum-allocation-vcores - 32 + 4 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3094a93..0aa8e72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -211,6 +211,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMinimumResourceCapability()); response.setMaximumResourceCapability(rScheduler .getMaximumResourceCapability()); + response.setIncrementResourceCapability(rScheduler + .getIncrementResourceCapability()); response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); return response; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java index 5a691af..57d82dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java @@ -54,19 +54,17 @@ public Resource divideAndCeil(Resource numerator, int denominator) { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource) { - int normalizedMemory = Math.min( - roundUp( - Math.max(r.getMemory(), minimumResource.getMemory()), - minimumResource.getMemory()), - maximumResource.getMemory()); + Resource maximumResource, Resource stepFactor) { + int normalizedMemory = normalizeInt(r.getMemory(), + minimumResource.getMemory(), maximumResource.getMemory(), + stepFactor.getMemory()); return Resources.createResource(normalizedMemory); } @Override - public Resource roundUp(Resource r, Resource minimumResource) { + public Resource roundUp(Resource r, Resource stepFactor) { return Resources.createResource( - roundUp(r.getMemory(),minimumResource.getMemory()) + roundUp(r.getMemory(), stepFactor.getMemory()) ); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java index 2f66990..eaa7370 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java @@ -124,26 +124,22 @@ public Resource divideAndCeil(Resource numerator, int denominator) { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource) { - int normalizedMemory = Math.min( - roundUp( - Math.max(r.getMemory(), minimumResource.getMemory()), - minimumResource.getMemory()), - maximumResource.getMemory()); - int normalizedCores = Math.min( - roundUp( - Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), - minimumResource.getVirtualCores()), - maximumResource.getVirtualCores()); + Resource maximumResource, Resource stepFactor) { + int normalizedMemory = normalizeInt(r.getMemory(), + minimumResource.getMemory(), maximumResource.getMemory(), + stepFactor.getMemory()); + int normalizedCores = normalizeInt(r.getVirtualCores(), + minimumResource.getVirtualCores(), maximumResource.getVirtualCores(), + stepFactor.getVirtualCores()); return Resources.createResource(normalizedMemory, normalizedCores); } @Override - public Resource roundUp(Resource r, Resource minimumResource) { + public Resource roundUp(Resource r, Resource stepFactor) { return Resources.createResource( - roundUp(r.getMemory(), minimumResource.getMemory()), - roundUp(r.getVirtualCores(), minimumResource.getVirtualCores()) + roundUp(r.getMemory(), stepFactor.getMemory()), + roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()) ); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java index b2dd19b..ad4fead 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java @@ -51,6 +51,22 @@ public static int roundDown(int a, int b) { return (a / b) * b; } + public static int normalizeInt(int value, int min, int max, int increment) { + if (value > min) { + if (value < max) { + value = roundUp(value, increment); + if (value > max) { + value = max; + } + } else { + value = max; + } + } else { + value = min; + } + return value; + } + /** * Compute the number of containers which can be allocated given * available and required resources. @@ -94,10 +110,11 @@ public abstract Resource multiplyAndNormalizeDown( * @param r resource * @param minimumResource step-factor * @param maximumResource the upper bound of the resource to be allocated + * @param stepFactor the increment for resources to be allocated * @return normalized resource */ public abstract Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource); + Resource maximumResource, Resource stepFactor); /** * Round-up resource r given factor stepFactor. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java index 58cd676..5e67098 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java @@ -132,9 +132,9 @@ public static Resource multiplyAndRoundDown(Resource lhs, double by) { } public static Resource normalize( - ResourceCalculator calculator, Resource lhs, Resource factor, - Resource limit) { - return calculator.normalize(lhs, factor, limit); + ResourceCalculator calculator, Resource lhs, Resource min, + Resource max, Resource increment) { + return calculator.normalize(lhs, min, max, increment); } public static Resource roundUp( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c0a54c7..0426761 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -90,11 +90,12 @@ public static void normalizeRequests( ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, + Resource incrementResource) { for (ResourceRequest ask : asks) { normalizeRequest( ask, resourceCalculator, clusterResource, minimumResource, - maximumResource); + maximumResource, incrementResource); } } @@ -107,11 +108,12 @@ public static void normalizeRequest( ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, + Resource incrementResource) { Resource normalized = Resources.normalize( resourceCalculator, ask.getCapability(), minimumResource, - maximumResource); + maximumResource, incrementResource); ask.setCapability(normalized); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index f084649..cfa9631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -80,6 +80,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, public Resource getMaximumResourceCapability(); /** + * Get increment allocatable {@link Resource}. + * @return increment allocatable resource + */ + @Public + @Stable + public Resource getIncrementResourceCapability(); + + /** * Get the number of nodes available in the cluster. * @return the number of available nodes. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index aca2a12..5dce033 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -177,6 +177,11 @@ public Resource getMaximumResourceCapability() { } @Override + public Resource getIncrementResourceCapability() { + return minimumAllocation; + } + + @Override public Comparator getApplicationComparator() { return applicationComparator; } @@ -484,7 +489,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, calculator, getClusterResources(), minimumAllocation, - maximumAllocation); + maximumAllocation, minimumAllocation); // Release containers for (ContainerId releasedContainerId : release) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f8cff0e..6c31276 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -115,6 +115,7 @@ private RMContext rmContext; private Resource minimumAllocation; private Resource maximumAllocation; + private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; @@ -526,6 +527,11 @@ public Resource getMaximumResourceCapability() { return maximumAllocation; } + @Override + public Resource getIncrementResourceCapability() { + return incrAllocation; + } + public double getNodeLocalityThreshold() { return nodeLocalityThreshold; } @@ -730,7 +736,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation); + clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -988,6 +994,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) this.conf = new FairSchedulerConfiguration(conf); minimumAllocation = this.conf.getMinimumAllocation(); maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 5696746..39b35b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -102,6 +102,17 @@ public Resource getMaximumAllocation() { return Resources.createResource(mem, cpu); } + public Resource getIncrementAllocation() { + Resource minimum = getMinimumAllocation(); + int incrementMemory = getInt( + YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + minimum.getMemory()); + int incrementCores = getInt( + YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, + minimum.getVirtualCores()); + return Resources.createResource(incrementMemory, incrementCores); + } + public boolean getUserAsDefaultQueue() { return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 2f4e70d..1e3015c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -196,6 +196,11 @@ public Resource getMaximumResourceCapability() { } @Override + public Resource getIncrementResourceCapability() { + return minimumAllocation; + } + + @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { @@ -232,7 +237,8 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, maximumAllocation, + minimumAllocation); // Release containers for (ContainerId releasedContainer : release) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index f17da43..fcb4866 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -42,45 +42,47 @@ public void testNormalizeRequest() { final int minMemory = 1024; final int maxMemory = 8192; + final int incrMemory = 256; Resource minResource = Resources.createResource(minMemory, 0); Resource maxResource = Resources.createResource(maxMemory, 0); + Resource incrResource = Resources.createResource(incrMemory, 0); ResourceRequest ask = new ResourceRequestPBImpl(); // case negative memory ask.setCapability(Resources.createResource(-1024)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(minMemory, ask.getCapability().getMemory()); // case zero memory ask.setCapability(Resources.createResource(0)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(minMemory, ask.getCapability().getMemory()); // case memory is a multiple of minMemory ask.setCapability(Resources.createResource(2 * minMemory)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(2 * minMemory, ask.getCapability().getMemory()); // case memory is not a multiple of minMemory ask.setCapability(Resources.createResource(minMemory + 10)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); - assertEquals(2 * minMemory, ask.getCapability().getMemory()); + maxResource, incrResource); + assertEquals(minMemory + incrMemory, ask.getCapability().getMemory()); // case memory is equal to max allowed ask.setCapability(Resources.createResource(maxMemory)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(maxMemory, ask.getCapability().getMemory()); // case memory is just less than max ask.setCapability(Resources.createResource(maxMemory - 10)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(maxMemory, ask.getCapability().getMemory()); // max is not a multiple of min @@ -88,14 +90,14 @@ public void testNormalizeRequest() { ask.setCapability(Resources.createResource(maxMemory - 100)); // multiple of minMemory > maxMemory, then reduce to maxMemory SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(maxResource.getMemory(), ask.getCapability().getMemory()); // ask is more than max maxResource = Resources.createResource(maxMemory, 0); ask.setCapability(Resources.createResource(maxMemory + 100)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, - maxResource); + maxResource, incrResource); assertEquals(maxResource.getMemory(), ask.getCapability().getMemory()); } @@ -105,6 +107,7 @@ public void testNormalizeRequestWithDominantResourceCalculator() { Resource minResource = Resources.createResource(1024, 1); Resource maxResource = Resources.createResource(10240, 10); + Resource incrResource = Resources.createResource(256, 2); Resource clusterResource = Resources.createResource(10 * 1024, 10); ResourceRequest ask = new ResourceRequestPBImpl(); @@ -112,24 +115,27 @@ public void testNormalizeRequestWithDominantResourceCalculator() { // case negative memory/vcores ask.setCapability(Resources.createResource(-1024, -1)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, clusterResource, minResource, maxResource); + ask, resourceCalculator, clusterResource, minResource, maxResource, + incrResource); assertEquals(minResource, ask.getCapability()); // case zero memory/vcores ask.setCapability(Resources.createResource(0, 0)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, clusterResource, minResource, maxResource); + ask, resourceCalculator, clusterResource, minResource, maxResource, + incrResource); assertEquals(minResource, ask.getCapability()); assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(1024, ask.getCapability().getMemory()); // case non-zero memory & zero cores - ask.setCapability(Resources.createResource(1536, 0)); + ask.setCapability(Resources.createResource(1200, 0)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, clusterResource, minResource, maxResource); - assertEquals(Resources.createResource(2048, 1), ask.getCapability()); + ask, resourceCalculator, clusterResource, minResource, maxResource, + incrResource); + assertEquals(Resources.createResource(1280, 1), ask.getCapability()); assertEquals(1, ask.getCapability().getVirtualCores()); - assertEquals(2048, ask.getCapability().getMemory()); + assertEquals(1280, ask.getCapability().getMemory()); } @Test (timeout = 30000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e2c753f..b055be6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -236,6 +236,7 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128); scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); @@ -244,6 +245,7 @@ public void testLoadConfigurationOnInitialize() throws IOException { Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(128, scheduler.getIncrementResourceCapability().getMemory()); } @Test