diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 34f1e935dde..083954630f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -402,6 +402,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean
DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false;
+ /**
+ * Maximum number of opportunistic containers to be allocated in
+ * AM heartbeat.
+ */
+ @Unstable
+ public static final String
+ OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT =
+ RM_PREFIX + "opportunistic.max.container-allocation.per.am.heartbeat";
+ public static final int
+ DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = -1;
+
/** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during container allocation. */
@Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 004af7c394f..9cbf8991e66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3333,6 +3333,15 @@
false
+
+
+ Maximum number of opportunistic containers to be allocated per
+ Application Master heartbeat.
+
+ yarn.resourcemanager.opportunistic.max.container-allocation.per.am.heartbeat
+ -1
+
+
Number of nodes to be used by the Opportunistic Container Allocator for
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 5600aa80dbb..aec1d9b2d96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -70,6 +71,8 @@
private static final int RACK_LOCAL_LOOP = 1;
private static final int OFF_SWITCH_LOOP = 2;
+ private int maxAllocationsPerAMHeartbeat = -1;
+
/**
* This class encapsulates application specific parameters used to build a
* Container.
@@ -291,6 +294,24 @@ public OpportunisticContainerAllocator(
this.tokenSecretManager = tokenSecretManager;
}
+ /**
+ * Create a new Opportunistic Container Allocator.
+ * @param tokenSecretManager TokenSecretManager
+ * @param maxAllocationsPerAMHeartbeat max number of containers to be
+ * allocated in one AM heartbeat
+ */
+ public OpportunisticContainerAllocator(
+ BaseContainerTokenSecretManager tokenSecretManager,
+ int maxAllocationsPerAMHeartbeat) {
+ this.tokenSecretManager = tokenSecretManager;
+ this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
+ }
+
+ @VisibleForTesting
+ void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) {
+ this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
+ }
+
/**
* Allocate OPPORTUNISTIC containers.
* @param blackList Resource BlackList Request
@@ -316,7 +337,6 @@ public OpportunisticContainerAllocator(
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs);
-
Set nodeBlackList = new HashSet<>(opportContext.getBlacklist());
List allocatedContainers = new ArrayList<>();
@@ -333,9 +353,21 @@ public OpportunisticContainerAllocator(
// might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
+ int remAllocs = -1;
+ if (maxAllocationsPerAMHeartbeat > 0) {
+ remAllocs =
+ maxAllocationsPerAMHeartbeat - allocatedContainers.size()
+ - getTotalAllocations(allocations);
+ if (remAllocs <= 0) {
+ LOG.info("Not allocating more containers as we have reached max "
+ + "allocations per AM heartbeat {}",
+ maxAllocationsPerAMHeartbeat);
+ break;
+ }
+ }
Map> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
- appSubmitter, nodeBlackList);
+ appSubmitter, nodeBlackList, remAllocs);
if (allocation.size() > 0) {
allocations.add(allocation);
continueLoop = true;
@@ -355,16 +387,42 @@ public OpportunisticContainerAllocator(
return allocatedContainers;
}
+ private int getTotalAllocations(
+ List