diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 8b1b709..00109ca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -36,6 +36,7 @@
*
* - Minimum capability for allocated resources in the cluster.
* - Maximum capability for allocated resources in the cluster.
+ * - Increment capability for allocated resources in the cluster.
*
*
*
@@ -72,6 +73,19 @@
public void setMaximumResourceCapability(Resource capability);
/**
+ * Get the increment capability for any {@link Resource} allocated by the
+ * ResourceManager in the cluster.
+ * @return incremente capability of allocated resources in the cluster
+ */
+ @Public
+ @Stable
+ public Resource getIncrementResourceCapability();
+
+ @Private
+ @Unstable
+ public void setIncrementResourceCapability(Resource capability);
+
+ /**
* Get the ApplicationACLs for the application.
* @return all the ApplicationACLs
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
index b861282..f0f4acd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
@@ -46,6 +46,7 @@
private Resource minimumResourceCapability;
private Resource maximumResourceCapability;
+ private Resource incrementResourceCapability;
private Map applicationACLS = null;
public RegisterApplicationMasterResponsePBImpl() {
@@ -79,7 +80,11 @@ private void mergeLocalToBuilder() {
}
if (this.maximumResourceCapability != null) {
builder.setMaximumCapability(
- convertToProtoFormat(this.maximumResourceCapability));
+ convertToProtoFormat(this.maximumResourceCapability));
+ }
+ if (this.incrementResourceCapability != null) {
+ builder.setIncrementCapability(
+ convertToProtoFormat(this.incrementResourceCapability));
}
if (this.applicationACLS != null) {
addApplicationACLs();
@@ -125,6 +130,22 @@ public Resource getMinimumResourceCapability() {
}
@Override
+ public Resource getIncrementResourceCapability() {
+ if (this.incrementResourceCapability != null) {
+ return this.incrementResourceCapability;
+ }
+
+ RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasIncrementCapability()) {
+ return null;
+ }
+
+ this.incrementResourceCapability
+ = convertFromProtoFormat(p.getIncrementCapability());
+ return this.incrementResourceCapability;
+ }
+
+ @Override
public void setMaximumResourceCapability(Resource capability) {
maybeInitBuilder();
if(maximumResourceCapability == null) {
@@ -142,6 +163,14 @@ public void setMinimumResourceCapability(Resource capability) {
this.minimumResourceCapability = capability;
}
+ @Override
+ public void setIncrementResourceCapability(Resource capability) {
+ maybeInitBuilder();
+ if(incrementResourceCapability == null) {
+ builder.clearIncrementCapability();
+ }
+ this.incrementResourceCapability = capability;
+ }
@Override
public Map getApplicationACLs() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fa1e740..6f9b2b1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -123,6 +123,12 @@
YARN_PREFIX + "scheduler.maximum-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
+ /** Increment request grant-able by the RM scheduler. */
+ public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
+ YARN_PREFIX + "scheduler.increment-allocation-mb";
+ public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
+ YARN_PREFIX + "scheduler.increment-allocation-vcores";
+
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =
RM_PREFIX + "scheduler.client.thread-count";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index ed3f871..7bf9a3f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -38,6 +38,7 @@ message RegisterApplicationMasterResponseProto {
optional ResourceProto minimumCapability = 1;
optional ResourceProto maximumCapability = 2;
repeated ApplicationACLMapProto application_ACLs = 3;
+ optional ResourceProto incrementCapability = 4;
}
message FinishApplicationMasterRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java
new file mode 100644
index 0000000..0cb97ee
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMinMaxIncrResourcesWithFS.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMinMaxIncrResourcesWithFS {
+ static Configuration conf = null;
+ static MiniYARNCluster yarnCluster = null;
+ static YarnClientImpl yarnClient = null;
+ static ApplicationAttemptId attemptId = null;
+ static int nodeCount = 3;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // start minicluster
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 2);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 3);
+ yarnCluster = new MiniYARNCluster(TestMinMaxIncrResourcesWithFS.class.getName(),
+ nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = new YarnClientImpl();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ }
+
+ @Before
+ public void startApp() throws Exception {
+ // submit new app
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ break;
+ }
+ }
+ }
+
+ @After
+ public void cancelApp() {
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+
+ @Test (timeout=60000)
+ public void testMinMaxIncrResources() throws YarnRemoteException, IOException {
+ AMRMClientImpl amClient = null;
+ try {
+ // start am rm client
+ amClient = new AMRMClientImpl(attemptId);
+ amClient.init(conf);
+ amClient.start();
+
+ RegisterApplicationMasterResponse response =
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ int incrMem = response.getIncrementResourceCapability().getMemory();
+ int minCPUs = response.getMinimumResourceCapability().getVirtualCores();
+ int maxCPUs = response.getMaximumResourceCapability().getVirtualCores();
+ int incrCPUs = response.getIncrementResourceCapability().getVirtualCores();
+ assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ minMem);
+ assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ maxMem);
+ assertEquals(2, incrMem);
+ assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ minCPUs);
+ assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ maxCPUs);
+ assertEquals(3, incrCPUs);
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java
index 64c7ca7..b946dce 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ClusterInfo.java
@@ -24,15 +24,18 @@
public class ClusterInfo {
private Resource minContainerCapability;
private Resource maxContainerCapability;
+ private Resource incrContainerCapacity;
public ClusterInfo() {
this.minContainerCapability = Records.newRecord(Resource.class);
this.maxContainerCapability = Records.newRecord(Resource.class);
+ this.incrContainerCapacity = Records.newRecord(Resource.class);
}
public ClusterInfo(Resource minCapability, Resource maxCapability) {
this.minContainerCapability = minCapability;
this.maxContainerCapability = maxCapability;
+ this.incrContainerCapacity = minCapability;
}
public Resource getMinContainerCapability() {
@@ -50,4 +53,13 @@ public Resource getMaxContainerCapability() {
public void setMaxContainerCapability(Resource maxContainerCapability) {
this.maxContainerCapability = maxContainerCapability;
}
+
+ public Resource getIncrContainerCapability() {
+ return incrContainerCapacity;
+ }
+
+ public void setIncrContainerCapability(Resource incrContainerCapacity) {
+ this.incrContainerCapacity = incrContainerCapacity;
+ }
+
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d71d193..e584b44 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -228,7 +228,7 @@
in terms of virtual CPU cores. Requests higher than this won't take effect,
and will get capped to this value.
yarn.scheduler.maximum-allocation-vcores
- 32
+ 4
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 3094a93..0aa8e72 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -211,6 +211,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
.getMinimumResourceCapability());
response.setMaximumResourceCapability(rScheduler
.getMaximumResourceCapability());
+ response.setIncrementResourceCapability(rScheduler
+ .getIncrementResourceCapability());
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
return response;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
index 5a691af..57d82dd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
@@ -54,19 +54,17 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
@Override
public Resource normalize(Resource r, Resource minimumResource,
- Resource maximumResource) {
- int normalizedMemory = Math.min(
- roundUp(
- Math.max(r.getMemory(), minimumResource.getMemory()),
- minimumResource.getMemory()),
- maximumResource.getMemory());
+ Resource maximumResource, Resource stepFactor) {
+ int normalizedMemory = normalizeInt(r.getMemory(),
+ minimumResource.getMemory(), maximumResource.getMemory(),
+ stepFactor.getMemory());
return Resources.createResource(normalizedMemory);
}
@Override
- public Resource roundUp(Resource r, Resource minimumResource) {
+ public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
- roundUp(r.getMemory(),minimumResource.getMemory())
+ roundUp(r.getMemory(), stepFactor.getMemory())
);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
index 2f66990..eaa7370 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
@@ -124,26 +124,22 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
@Override
public Resource normalize(Resource r, Resource minimumResource,
- Resource maximumResource) {
- int normalizedMemory = Math.min(
- roundUp(
- Math.max(r.getMemory(), minimumResource.getMemory()),
- minimumResource.getMemory()),
- maximumResource.getMemory());
- int normalizedCores = Math.min(
- roundUp(
- Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
- minimumResource.getVirtualCores()),
- maximumResource.getVirtualCores());
+ Resource maximumResource, Resource stepFactor) {
+ int normalizedMemory = normalizeInt(r.getMemory(),
+ minimumResource.getMemory(), maximumResource.getMemory(),
+ stepFactor.getMemory());
+ int normalizedCores = normalizeInt(r.getVirtualCores(),
+ minimumResource.getVirtualCores(), maximumResource.getVirtualCores(),
+ stepFactor.getVirtualCores());
return Resources.createResource(normalizedMemory,
normalizedCores);
}
@Override
- public Resource roundUp(Resource r, Resource minimumResource) {
+ public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
- roundUp(r.getMemory(), minimumResource.getMemory()),
- roundUp(r.getVirtualCores(), minimumResource.getVirtualCores())
+ roundUp(r.getMemory(), stepFactor.getMemory()),
+ roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
index b2dd19b..ad4fead 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
@@ -51,6 +51,22 @@ public static int roundDown(int a, int b) {
return (a / b) * b;
}
+ public static int normalizeInt(int value, int min, int max, int increment) {
+ if (value > min) {
+ if (value < max) {
+ value = roundUp(value, increment);
+ if (value > max) {
+ value = max;
+ }
+ } else {
+ value = max;
+ }
+ } else {
+ value = min;
+ }
+ return value;
+ }
+
/**
* Compute the number of containers which can be allocated given
* available and required resources.
@@ -94,10 +110,11 @@ public abstract Resource multiplyAndNormalizeDown(
* @param r resource
* @param minimumResource step-factor
* @param maximumResource the upper bound of the resource to be allocated
+ * @param stepFactor the increment for resources to be allocated
* @return normalized resource
*/
public abstract Resource normalize(Resource r, Resource minimumResource,
- Resource maximumResource);
+ Resource maximumResource, Resource stepFactor);
/**
* Round-up resource r given factor stepFactor.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
index 58cd676..5e67098 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
@@ -132,9 +132,9 @@ public static Resource multiplyAndRoundDown(Resource lhs, double by) {
}
public static Resource normalize(
- ResourceCalculator calculator, Resource lhs, Resource factor,
- Resource limit) {
- return calculator.normalize(lhs, factor, limit);
+ ResourceCalculator calculator, Resource lhs, Resource min,
+ Resource max, Resource increment) {
+ return calculator.normalize(lhs, min, max, increment);
}
public static Resource roundUp(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index c0a54c7..0426761 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -90,11 +90,12 @@ public static void normalizeRequests(
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
- Resource maximumResource) {
+ Resource maximumResource,
+ Resource incrementResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource,
- maximumResource);
+ maximumResource, incrementResource);
}
}
@@ -107,11 +108,12 @@ public static void normalizeRequest(
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
- Resource maximumResource) {
+ Resource maximumResource,
+ Resource incrementResource) {
Resource normalized =
Resources.normalize(
resourceCalculator, ask.getCapability(), minimumResource,
- maximumResource);
+ maximumResource, incrementResource);
ask.setCapability(normalized);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index f084649..cfa9631 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -80,6 +80,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
public Resource getMaximumResourceCapability();
/**
+ * Get increment allocatable {@link Resource}.
+ * @return increment allocatable resource
+ */
+ @Public
+ @Stable
+ public Resource getIncrementResourceCapability();
+
+ /**
* Get the number of nodes available in the cluster.
* @return the number of available nodes.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index aca2a12..5dce033 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -177,6 +177,11 @@ public Resource getMaximumResourceCapability() {
}
@Override
+ public Resource getIncrementResourceCapability() {
+ return minimumAllocation;
+ }
+
+ @Override
public Comparator getApplicationComparator() {
return applicationComparator;
}
@@ -484,7 +489,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(
ask, calculator, getClusterResources(), minimumAllocation,
- maximumAllocation);
+ maximumAllocation, minimumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f8cff0e..6c31276 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -115,6 +115,7 @@
private RMContext rmContext;
private Resource minimumAllocation;
private Resource maximumAllocation;
+ private Resource incrAllocation;
private QueueManager queueMgr;
private Clock clock;
@@ -526,6 +527,11 @@ public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
+ @Override
+ public Resource getIncrementResourceCapability() {
+ return incrAllocation;
+ }
+
public double getNodeLocalityThreshold() {
return nodeLocalityThreshold;
}
@@ -730,7 +736,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterCapacity, minimumAllocation, maximumAllocation);
+ clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
@@ -988,6 +994,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
this.conf = new FairSchedulerConfiguration(conf);
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
+ incrAllocation = this.conf.getIncrementAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index 5696746..39b35b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -102,6 +102,17 @@ public Resource getMaximumAllocation() {
return Resources.createResource(mem, cpu);
}
+ public Resource getIncrementAllocation() {
+ Resource minimum = getMinimumAllocation();
+ int incrementMemory = getInt(
+ YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ minimum.getMemory());
+ int incrementCores = getInt(
+ YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
+ minimum.getVirtualCores());
+ return Resources.createResource(incrementMemory, incrementCores);
+ }
+
public boolean getUserAsDefaultQueue() {
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 2f4e70d..1e3015c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -196,6 +196,11 @@ public Resource getMaximumResourceCapability() {
}
@Override
+ public Resource getIncrementResourceCapability() {
+ return minimumAllocation;
+ }
+
+ @Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
@@ -232,7 +237,8 @@ public Allocation allocate(
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation, maximumAllocation);
+ clusterResource, minimumAllocation, maximumAllocation,
+ minimumAllocation);
// Release containers
for (ContainerId releasedContainer : release) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index f17da43..fcb4866 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -42,45 +42,47 @@ public void testNormalizeRequest() {
final int minMemory = 1024;
final int maxMemory = 8192;
+ final int incrMemory = 256;
Resource minResource = Resources.createResource(minMemory, 0);
Resource maxResource = Resources.createResource(maxMemory, 0);
+ Resource incrResource = Resources.createResource(incrMemory, 0);
ResourceRequest ask = new ResourceRequestPBImpl();
// case negative memory
ask.setCapability(Resources.createResource(-1024));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case zero memory
ask.setCapability(Resources.createResource(0));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case memory is a multiple of minMemory
ask.setCapability(Resources.createResource(2 * minMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(2 * minMemory, ask.getCapability().getMemory());
// case memory is not a multiple of minMemory
ask.setCapability(Resources.createResource(minMemory + 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
- assertEquals(2 * minMemory, ask.getCapability().getMemory());
+ maxResource, incrResource);
+ assertEquals(minMemory + incrMemory, ask.getCapability().getMemory());
// case memory is equal to max allowed
ask.setCapability(Resources.createResource(maxMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(maxMemory, ask.getCapability().getMemory());
// case memory is just less than max
ask.setCapability(Resources.createResource(maxMemory - 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(maxMemory, ask.getCapability().getMemory());
// max is not a multiple of min
@@ -88,14 +90,14 @@ public void testNormalizeRequest() {
ask.setCapability(Resources.createResource(maxMemory - 100));
// multiple of minMemory > maxMemory, then reduce to maxMemory
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
// ask is more than max
maxResource = Resources.createResource(maxMemory, 0);
ask.setCapability(Resources.createResource(maxMemory + 100));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
- maxResource);
+ maxResource, incrResource);
assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
}
@@ -105,6 +107,7 @@ public void testNormalizeRequestWithDominantResourceCalculator() {
Resource minResource = Resources.createResource(1024, 1);
Resource maxResource = Resources.createResource(10240, 10);
+ Resource incrResource = Resources.createResource(256, 2);
Resource clusterResource = Resources.createResource(10 * 1024, 10);
ResourceRequest ask = new ResourceRequestPBImpl();
@@ -112,24 +115,27 @@ public void testNormalizeRequestWithDominantResourceCalculator() {
// case negative memory/vcores
ask.setCapability(Resources.createResource(-1024, -1));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource, maxResource);
+ ask, resourceCalculator, clusterResource, minResource, maxResource,
+ incrResource);
assertEquals(minResource, ask.getCapability());
// case zero memory/vcores
ask.setCapability(Resources.createResource(0, 0));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource, maxResource);
+ ask, resourceCalculator, clusterResource, minResource, maxResource,
+ incrResource);
assertEquals(minResource, ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(1024, ask.getCapability().getMemory());
// case non-zero memory & zero cores
- ask.setCapability(Resources.createResource(1536, 0));
+ ask.setCapability(Resources.createResource(1200, 0));
SchedulerUtils.normalizeRequest(
- ask, resourceCalculator, clusterResource, minResource, maxResource);
- assertEquals(Resources.createResource(2048, 1), ask.getCapability());
+ ask, resourceCalculator, clusterResource, minResource, maxResource,
+ incrResource);
+ assertEquals(Resources.createResource(1280, 1), ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
- assertEquals(2048, ask.getCapability().getMemory());
+ assertEquals(1280, ask.getCapability().getMemory());
}
@Test (timeout = 30000)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e2c753f..b055be6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -236,6 +236,7 @@ public void testLoadConfigurationOnInitialize() throws IOException {
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128);
scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign);
@@ -244,6 +245,7 @@ public void testLoadConfigurationOnInitialize() throws IOException {
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory());
Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory());
+ Assert.assertEquals(128, scheduler.getIncrementResourceCapability().getMemory());
}
@Test