diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 315044d..a33635e 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -153,6 +153,12 @@
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 3bc2e9b..03fc40e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -117,7 +117,8 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ RMApplicationHistoryWriter rmApplicationHistoryWriter,
+ ResourceScheduler scheduler) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
@@ -128,6 +129,7 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+ this.setScheduler(scheduler);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index ebf2fe4..1d0d6c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -87,18 +87,46 @@ public RMContextImpl(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ RMApplicationHistoryWriter rmApplicationHistoryWriter,
+ ResourceScheduler scheduler) {
this();
this.setDispatcher(rmDispatcher);
setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
- clientToAMTokenSecretManager, rmApplicationHistoryWriter));
+ clientToAMTokenSecretManager, rmApplicationHistoryWriter,
+ scheduler));
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
}
+
+ @VisibleForTesting
+ // helper constructor for tests
+ public RMContextImpl(Dispatcher rmDispatcher,
+ ContainerAllocationExpirer containerAllocationExpirer,
+ AMLivelinessMonitor amLivelinessMonitor,
+ AMLivelinessMonitor amFinishingMonitor,
+ DelegationTokenRenewer delegationTokenRenewer,
+ AMRMTokenSecretManager appTokenSecretManager,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ NMTokenSecretManagerInRM nmTokenSecretManager,
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this(
+ rmDispatcher,
+ containerAllocationExpirer,
+ amLivelinessMonitor,
+ amFinishingMonitor,
+ delegationTokenRenewer,
+ appTokenSecretManager,
+ containerTokenSecretManager,
+ nmTokenSecretManager,
+ clientToAMTokenSecretManager,
+ rmApplicationHistoryWriter,
+ null);
+ }
@Override
public Dispatcher getDispatcher() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 624aa18..fbcaab9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -239,4 +240,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
RMAppMetrics getRMAppMetrics();
ReservationId getReservationId();
+
+ ResourceRequest getAMResourceRequest();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 33b62fe..2d1737a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1339,6 +1339,11 @@ public void setSystemClock(Clock clock) {
public ReservationId getReservationId() {
return submissionContext.getReservationID();
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
protected Credentials parseCredentials() throws IOException {
Credentials credentials = new Credentials();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 0a2fa3a..f458057 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -109,30 +109,6 @@ public static float computeAbsoluteMaximumCapacity(
}
return absoluteMaxCapacityByNodeLabels;
}
-
- public static int computeMaxActiveApplications(
- ResourceCalculator calculator,
- Resource clusterResource, Resource minimumAllocation,
- float maxAMResourcePercent, float absoluteMaxCapacity) {
- return
- Math.max(
- (int)Math.ceil(
- Resources.ratio(
- calculator,
- clusterResource,
- minimumAllocation) *
- maxAMResourcePercent * absoluteMaxCapacity
- ),
- 1);
- }
-
- public static int computeMaxActiveApplicationsPerUser(
- int maxActiveApplications, int userLimit, float userLimitFactor) {
- return Math.max(
- (int)Math.ceil(
- maxActiveApplications * (userLimit / 100.0f) * userLimitFactor),
- 1);
- }
@Lock(CSQueue.class)
public static void updateQueueStatistics(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 47679a6..01dcfa9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -87,9 +87,6 @@
protected int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
- private int maxActiveApplications; // Based on absolute max capacity
- private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
- private int maxActiveApplicationsPerUser;
private int nodeLocalityDelay;
@@ -115,6 +112,8 @@
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
+ private final Resource usedAMResources = Resource.newInstance(0, 0);
+
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@@ -149,19 +148,6 @@ public LeafQueue(CapacitySchedulerContext cs,
float maxAMResourcePerQueuePercent = cs.getConfiguration()
.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
- int maxActiveApplications =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- cs.getClusterResource(), this.minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteMaxCapacity);
- this.maxActiveAppsUsingAbsCap =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- cs.getClusterResource(), this.minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteCapacity);
- int maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(
- maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
this.queueInfo.setChildQueues(new ArrayList());
@@ -173,8 +159,7 @@ public LeafQueue(CapacitySchedulerContext cs,
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
- maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
- .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+ state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
defaultLabelExpression, this.capacitiyByNodeLabels,
this.maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
@@ -202,8 +187,7 @@ protected synchronized void setupQueueConfigs(
float maximumCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
int maxApplications, float maxAMResourcePerQueuePercent,
- int maxApplicationsPerUser, int maxActiveApplications,
- int maxActiveApplicationsPerUser, QueueState state,
+ int maxApplicationsPerUser, QueueState state,
Map acls, int nodeLocalityDelay,
Set labels, String defaultLabelExpression,
Map capacitieByLabel,
@@ -228,9 +212,6 @@ protected synchronized void setupQueueConfigs(
this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
this.maxApplicationsPerUser = maxApplicationsPerUser;
- this.maxActiveApplications = maxActiveApplications;
- this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression)) {
throw new IOException("Invalid default label expression of "
@@ -282,21 +263,6 @@ protected synchronized void setupQueueConfigs(
"maxApplicationsPerUser = " + maxApplicationsPerUser +
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
"userLimitFactor) ]" + "\n" +
- "maxActiveApplications = " + maxActiveApplications +
- " [= max(" +
- "(int)ceil((clusterResourceMemory / minimumAllocation) * " +
- "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," +
- "1) ]" + "\n" +
- "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap +
- " [= max(" +
- "(int)ceil((clusterResourceMemory / minimumAllocation) *" +
- "maxAMResourcePercent * absoluteCapacity)," +
- "1) ]" + "\n" +
- "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
- " [= max(" +
- "(int)(maxActiveApplications * (userLimit / 100.0f) * " +
- "userLimitFactor)," +
- "1) ]" + "\n" +
"usedCapacity = " + usedCapacity +
" [= usedResourcesMemory / " +
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
@@ -349,14 +315,6 @@ public synchronized int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}
- public synchronized int getMaximumActiveApplications() {
- return maxActiveApplications;
- }
-
- public synchronized int getMaximumActiveApplicationsPerUser() {
- return maxActiveApplicationsPerUser;
- }
-
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
@@ -519,8 +477,6 @@ public synchronized void reinitialize(
newlyParsedLeafQueue.maxApplications,
newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
- newlyParsedLeafQueue.getMaximumActiveApplications(),
- newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay(),
newlyParsedLeafQueue.accessibleLabels,
@@ -606,27 +562,83 @@ public void submitApplication(ApplicationId applicationId, String userName,
}
}
+
+ @VisibleForTesting
+ protected Resource getAMResourceLimit() {
+ return Resources.multiply(
+ lastClusterResource,
+ maxAMResourcePerQueuePercent * absoluteMaxCapacity);
+ }
+
+ @VisibleForTesting
+ protected Resource getUserAMResourceLimit() {
+ return Resources.multiply(
+ lastClusterResource,
+ ( maxAMResourcePerQueuePercent * (userLimit / 100.0f) *
+ userLimitFactor )
+ * absoluteMaxCapacity);//this one should be just absoluteCapacity
+ }
private synchronized void activateApplications() {
+ //limit of allowed resource usage for application masters
+ Resource amLimit = getAMResourceLimit();
+ Resource userAMLimit = getUserAMResourceLimit();
+
for (Iterator i=pendingApplications.iterator();
i.hasNext(); ) {
FiCaSchedulerApp application = i.next();
- // Check queue limit
- if (getNumActiveApplications() >= getMaximumActiveApplications()) {
- break;
+ // Check am resource limit
+ Resource amIfStarted =
+ Resources.add(application.getAMResource(), usedAMResources);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("application AMResource " + application.getAMResource() +
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
+ " amLimit " + amLimit +
+ " lastClusterResource " + lastClusterResource +
+ " amIfStarted " + amIfStarted);
}
- // Check user limit
+ if (!Resources.fitsIn(amIfStarted, amLimit)) {
+ if (getNumActiveApplications() < 1) {
+ LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+ " single application in queue, it is likely set too low." +
+ " skipping enforcement to allow at least one application to start");
+ } else {
+ LOG.info("not starting application as amIfStarted exceeds amLimit");
+ continue;
+ }
+ }
+
+ // Check user am resource limit
+
User user = getUser(application.getUser());
- if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
- user.activateApplication();
- activeApplications.add(application);
- i.remove();
- LOG.info("Application " + application.getApplicationId() +
- " from user: " + application.getUser() +
- " activated in queue: " + getQueueName());
+
+ Resource userAmIfStarted =
+ Resources.add(application.getAMResource(),
+ user.getConsumedAMResources());
+
+ if (!Resources.fitsIn(userAmIfStarted, userAMLimit)) {
+ if (getNumActiveApplications() < 1) {
+ LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+ " single application in queue for user, it is likely set too low." +
+ " skipping enforcement to allow at least one application to start");
+ } else {
+ LOG.info("not starting application as amIfStarted exceeds " +
+ "userAmLimit");
+ continue;
+ }
}
+ user.activateApplication();
+ activeApplications.add(application);
+ Resources.addTo(usedAMResources, application.getAMResource());
+ Resources.addTo(user.getConsumedAMResources(),
+ application.getAMResource());
+ i.remove();
+ LOG.info("Application " + application.getApplicationId() +
+ " from user: " + application.getUser() +
+ " activated in queue: " + getQueueName());
}
}
@@ -672,6 +684,10 @@ public synchronized void removeApplicationAttempt(
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
+ } else {
+ Resources.subtractFrom(usedAMResources, application.getAMResource());
+ Resources.subtractFrom(user.getConsumedAMResources(),
+ application.getAMResource());
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
@@ -1728,21 +1744,6 @@ synchronized void releaseResource(Resource clusterResource,
public synchronized void updateClusterResource(Resource clusterResource) {
lastClusterResource = clusterResource;
- // Update queue properties
- maxActiveApplications =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- clusterResource, minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteMaxCapacity);
- maxActiveAppsUsingAbsCap =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- clusterResource, minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteCapacity);
- maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(
- maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
-
// Update metrics
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, getParent(), clusterResource,
@@ -1764,6 +1765,7 @@ public synchronized void updateClusterResource(Resource clusterResource) {
@VisibleForTesting
public static class User {
Resource consumed = Resources.createResource(0, 0);
+ Resource consumedAMResources = Resources.createResource(0, 0);
Map consumedByLabel = new HashMap();
int pendingApplications = 0;
int activeApplications = 0;
@@ -1787,6 +1789,10 @@ public int getPendingApplications() {
public int getActiveApplications() {
return activeApplications;
}
+
+ public Resource getConsumedAMResources() {
+ return consumedAMResources;
+ }
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 2f9569c..9f97b13 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -72,6 +73,20 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+
+ RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
+
+ Resource amResource;
+ if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+ //the rmApp may be undefined (the resource manager checks for this too)
+ //and unmanaged applications do not provide an amResource request
+ //in these cases, provide a default using the scheduler
+ amResource = rmContext.getScheduler().getMinimumResourceCapability();
+ } else {
+ amResource = rmApp.getAMResourceRequest().getCapability();
+ }
+
+ setAMResource(amResource);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index 89b4a78..5786129 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -114,8 +114,6 @@ protected void render(Block html) {
_("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
_("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
_("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
- _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
- _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
_("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
_("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index d90e963..639c515 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -32,8 +32,6 @@
protected int numContainers;
protected int maxApplications;
protected int maxApplicationsPerUser;
- protected int maxActiveApplications;
- protected int maxActiveApplicationsPerUser;
protected int userLimit;
protected UsersInfo users; // To add another level in the XML
protected float userLimitFactor;
@@ -48,8 +46,6 @@
numContainers = q.getNumContainers();
maxApplications = q.getMaxApplications();
maxApplicationsPerUser = q.getMaxApplicationsPerUser();
- maxActiveApplications = q.getMaximumActiveApplications();
- maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser();
userLimit = q.getUserLimit();
users = new UsersInfo(q.getUsers());
userLimitFactor = q.getUserLimitFactor();
@@ -75,14 +71,6 @@ public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}
- public int getMaxActiveApplications() {
- return maxActiveApplications;
- }
-
- public int getMaxActiveApplicationsPerUser() {
- return maxActiveApplicationsPerUser;
- }
-
public int getUserLimit() {
return userLimit;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 62e3e5c..f8d92aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,6 +52,7 @@
public abstract class MockAsm extends MockApps {
public static class ApplicationBase implements RMApp {
+ ResourceRequest amReq;
@Override
public String getUser() {
throw new UnsupportedOperationException("Not supported yet.");
@@ -183,6 +185,11 @@ public RMAppMetrics getRMAppMetrics() {
public ReservationId getReservationId() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
}
public static RMApp newApplication(int i) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index e93d351..f4cb3b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -23,6 +23,7 @@
import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -41,6 +42,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +57,13 @@ public void setUp() {
dispatcher = new DrainDispatcher();
this.rm = new MockRM() {
@Override
+ public void init(Configuration conf) {
+ conf.set(
+ CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+ "1.0");
+ super.init(conf);
+ }
+ @Override
protected EventHandler createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index c7513ab..b8663f6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -82,6 +82,7 @@ public void setUp() throws Exception {
.thenReturn(null);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
+ when(spyRMContext.getScheduler()).thenReturn(scheduler);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 787b5d7..ec990f9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@
StringBuilder diagnostics = new StringBuilder();
RMAppAttempt attempt;
int maxAppAttempts = 1;
+ ResourceRequest amReq;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@@ -264,4 +266,9 @@ public RMAppMetrics getRMAppMetrics() {
public ReservationId getReservationId() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 0cd74d0..12fa5cf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -28,16 +28,21 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,6 +52,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -56,6 +62,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Ignore;
public class TestApplicationLimits {
@@ -119,8 +126,6 @@ public void setUp() throws IOException {
// Some default values
doReturn(100).when(queue).getMaxApplications();
doReturn(25).when(queue).getMaxApplicationsPerUser();
- doReturn(10).when(queue).getMaximumActiveApplications();
- doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
}
private static final String A = "a";
@@ -138,8 +143,13 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
LOG.info("Setup top-level queues a and b");
}
-
+
private FiCaSchedulerApp getMockApplication(int appId, String user) {
+ return getMockApplication(appId, user, Resource.newInstance(0, 0));
+ }
+
+ private FiCaSchedulerApp getMockApplication(int appId, String user,
+ Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
@@ -147,164 +157,11 @@ private FiCaSchedulerApp getMockApplication(int appId, String user) {
when(application).getApplicationId();
doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
+ doReturn(amResource).when(application).getAMResource();
return application;
}
@Test
- public void testLimitsComputation() throws Exception {
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- YarnConfiguration conf = new YarnConfiguration();
-
- CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
- when(csContext.getConfiguration()).thenReturn(csConf);
- when(csContext.getConf()).thenReturn(conf);
- when(csContext.getMinimumResourceCapability()).
- thenReturn(Resources.createResource(GB, 1));
- when(csContext.getMaximumResourceCapability()).
- thenReturn(Resources.createResource(16*GB, 16));
- when(csContext.getApplicationComparator()).
- thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
- when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
- when(csContext.getRMContext()).thenReturn(rmContext);
-
- // Say cluster has 100 nodes of 16G each
- Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
- when(csContext.getClusterResource()).thenReturn(clusterResource);
-
- Map queues = new HashMap();
- CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
-
- LeafQueue queue = (LeafQueue)queues.get(A);
-
- LOG.info("Queue 'A' -" +
- " maxActiveApplications=" + queue.getMaximumActiveApplications() +
- " maxActiveApplicationsPerUser=" +
- queue.getMaximumActiveApplicationsPerUser());
- int expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
- int expectedMaxActiveAppsUsingAbsCap =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.getMaximumApplicationMasterResourcePercent() *
- queue.getAbsoluteCapacity()));
- assertEquals(
- (int)Math.ceil(
- expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) *
- queue.getUserLimitFactor()),
- queue.getMaximumActiveApplicationsPerUser());
- assertEquals(
- (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
- queue.getMetrics().getAvailableMB()
- );
-
- // Add some nodes to the cluster & test new limits
- clusterResource = Resources.createResource(120 * 16 * GB);
- root.updateClusterResource(clusterResource);
- expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
- expectedMaxActiveAppsUsingAbsCap =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.getMaximumApplicationMasterResourcePercent() *
- queue.getAbsoluteCapacity()));
- assertEquals(
- (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap *
- (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()),
- queue.getMaximumActiveApplicationsPerUser());
- assertEquals(
- (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
- queue.getMetrics().getAvailableMB()
- );
-
- // should return -1 if per queue setting not set
- assertEquals(
- (int)CapacitySchedulerConfiguration.UNDEFINED,
- csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
- int expectedMaxApps =
- (int)
- (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS *
- queue.getAbsoluteCapacity());
- assertEquals(expectedMaxApps, queue.getMaxApplications());
-
- int expectedMaxAppsPerUser = (int)(expectedMaxApps *
- (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor());
- assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser());
-
- // should default to global setting if per queue setting not set
- assertEquals(
- (long)CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
- (long)csConf.getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath())
- );
-
- // Change the per-queue max AM resources percentage.
- csConf.setFloat(
- "yarn.scheduler.capacity." +
- queue.getQueuePath() +
- ".maximum-am-resource-percent",
- 0.5f);
- // Re-create queues to get new configs.
- queues = new HashMap();
- root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
- clusterResource = Resources.createResource(100 * 16 * GB);
-
- queue = (LeafQueue)queues.get(A);
- expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
-
- assertEquals((long) 0.5,
- (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
-
- // Change the per-queue max applications.
- csConf.setInt(
- "yarn.scheduler.capacity." +
- queue.getQueuePath() +
- ".maximum-applications", 9999);
- // Re-create queues to get new configs.
- queues = new HashMap();
- root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
-
- queue = (LeafQueue)queues.get(A);
- assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
- assertEquals(9999, queue.getMaxApplications());
-
- expectedMaxAppsPerUser = (int)(9999 *
- (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor());
- assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser());
- }
-
- @Test
public void testActiveApplicationLimits() throws Exception {
final String user_0 = "user_0";
final String user_1 = "user_1";
@@ -349,9 +206,6 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
- // Change queue limit to be smaller so 2 users can fill it up
- doReturn(3).when(queue).getMaximumActiveApplications();
-
// Submit first app for user_1
FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
queue.submitApplicationAttempt(app_4, user_1);
@@ -381,6 +235,170 @@ public void testActiveApplicationLimits() throws Exception {
assertEquals(1, queue.getNumActiveApplications(user_1));
assertEquals(0, queue.getNumPendingApplications(user_1));
}
+
+ @Test
+ public void testUserAMResourceLimitAccumulated() throws Exception {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50);
+ setupQueueConfiguration(csConf);
+ YarnConfiguration conf = new YarnConfiguration();
+
+ CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getConf()).thenReturn(conf);
+ when(csContext.getMinimumResourceCapability()).
+ thenReturn(Resources.createResource(GB, 1));
+ when(csContext.getMaximumResourceCapability()).
+ thenReturn(Resources.createResource(16*GB, 16));
+ when(csContext.getApplicationComparator()).
+ thenReturn(CapacityScheduler.applicationComparator);
+ when(csContext.getQueueComparator()).
+ thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
+
+ Resource clusterResource = Resources.createResource(40 * GB, 20);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+ Map queues = new HashMap();
+ CapacityScheduler.parseQueue(csContext, csConf, null, "root",
+ queues, queues, TestUtils.spyHook);
+
+ LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
+
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ RMContext rmContext = TestUtils.getMockRMContext();
+ RMContext spyRMContext = spy(rmContext);
+
+ ConcurrentMap spyApps =
+ spy(new ConcurrentHashMap());
+ RMApp rmApp = mock(RMApp.class);
+ ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+ Resource amResource = Resources.createResource(0, 0);
+ when(amResourceRequest.getCapability()).thenReturn(amResource);
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
+ when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
+ // This uses the default 10% of cluster value for the max am resources
+ // which are allowed, at 40GB = 4GB for AM's, two can run at a time
+ // due to user limit, one per user
+ queue.updateClusterResource(Resource.newInstance(40 * GB, 20));
+
+ assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit());
+ assertEquals(Resource.newInstance(2 * GB, 1),
+ queue.getUserAMResourceLimit());
+
+ int APPLICATION_ID = 0;
+ // Submit first application
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_0, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit second application, will not start due to user amlimit
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_1, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+
+ // Submit third application, will start, as is diff user
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_2, user_1);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ }
+
+ @Test
+ public void testAMResourceLimit() throws Exception {
+ final String user_0 = "user_0";
+
+ // This uses the default 10% of cluster value for the max am resources
+ // which are allowed, at 20GB = 2GB for AM's, each of our apps below has
+ // a 2GB AM, so only one may run at a time
+ queue.updateClusterResource(Resource.newInstance(20 * GB, 10));
+
+ assertEquals(Resource.newInstance(2 * GB, 1), queue.getAMResourceLimit());
+
+ int APPLICATION_ID = 0;
+ // Submit first application
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_0, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit second application
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_1, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+
+ // Now finish first app so second should be activated
+ queue.finishApplicationAttempt(app_0, A);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+ }
+
+ @Test
+ public void testAMResourceLimitAccumulated() throws Exception {
+ final String user_0 = "user_0";
+
+ // This uses the default 10% of cluster value for the max am resources
+ // which are allowed, at 40GB = 4GB for AM's, two can run at a time
+ queue.updateClusterResource(Resource.newInstance(40 * GB, 20));
+
+ assertEquals(Resource.newInstance(4 * GB, 2), queue.getAMResourceLimit());
+
+ int APPLICATION_ID = 0;
+ // Submit first application
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_0, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit second application
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_1, user_0);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // Submit third application, will not start, insufficient resources
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_2, user_0);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(1, queue.getNumPendingApplications(user_0));
+ }
@Test
public void testActiveLimitsWithKilledApps() throws Exception {
@@ -388,9 +406,6 @@ public void testActiveLimitsWithKilledApps() throws Exception {
int APPLICATION_ID = 0;
- // set max active to 2
- doReturn(2).when(queue).getMaximumActiveApplications();
-
// Submit first application
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplicationAttempt(app_0, user_0);
@@ -506,6 +521,18 @@ public void testHeadroom() throws Exception {
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RMContext rmContext = TestUtils.getMockRMContext();
+ RMContext spyRMContext = spy(rmContext);
+
+ ConcurrentMap spyApps =
+ spy(new ConcurrentHashMap());
+ RMApp rmApp = mock(RMApp.class);
+ ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+ Resource amResource = Resources.createResource(0, 0);
+ when(amResourceRequest.getCapability()).thenReturn(amResource);
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
+ when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
Priority priority_1 = TestUtils.createMockPriority(1);
@@ -515,7 +542,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 =
spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
- queue.getActiveUsersManager(), rmContext));
+ queue.getActiveUsersManager(), spyRMContext));
queue.submitApplicationAttempt(app_0_0, user_0);
List app_0_0_requests = new ArrayList();
@@ -534,7 +561,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 =
spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
- queue.getActiveUsersManager(), rmContext));
+ queue.getActiveUsersManager(), spyRMContext));
queue.submitApplicationAttempt(app_0_1, user_0);
List app_0_1_requests = new ArrayList();
@@ -553,7 +580,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 =
spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
- queue.getActiveUsersManager(), rmContext));
+ queue.getActiveUsersManager(), spyRMContext));
queue.submitApplicationAttempt(app_1_0, user_1);
List app_1_0_requests = new ArrayList();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index fb7bb2c..88984bf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -124,6 +124,10 @@ public void setUp() throws Exception {
spy(new ConcurrentHashMap());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
+ ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+ Resource amResource = Resources.createResource(0, 0);
+ when(amResourceRequest.getCapability()).thenReturn(amResource);
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
@@ -265,26 +269,39 @@ public Container answer(InvocationOnMock invocation)
@Test
public void testInitializeQueue() throws Exception {
- final float epsilon = 1e-5f;
- //can add more sturdy test with 3-layer queues
- //once MAPREDUCE:3410 is resolved
- LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
- assertEquals(0.085, a.getCapacity(), epsilon);
- assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
- assertEquals(0.2, a.getMaximumCapacity(), epsilon);
- assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+ final float epsilon = 1e-5f;
+ //can add more sturdy test with 3-layer queues
+ //once MAPREDUCE:3410 is resolved
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+ assertEquals(0.085, a.getCapacity(), epsilon);
+ assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.2, a.getMaximumCapacity(), epsilon);
+ assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+
+ LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
+ assertEquals(0.80, b.getCapacity(), epsilon);
+ assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.99, b.getMaximumCapacity(), epsilon);
+ assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
+
+ ParentQueue c = (ParentQueue)queues.get(C);
+ assertEquals(0.015, c.getCapacity(), epsilon);
+ assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.1, c.getMaximumCapacity(), epsilon);
+ assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+
+ //Verify the value for getAMResourceLimit for queues with < .1 maxcap
+ Resource clusterResource = Resource.newInstance(50 * GB, 50);
- LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
- assertEquals(0.80, b.getCapacity(), epsilon);
- assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
- assertEquals(0.99, b.getMaximumCapacity(), epsilon);
- assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
-
- ParentQueue c = (ParentQueue)queues.get(C);
- assertEquals(0.015, c.getCapacity(), epsilon);
- assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
- assertEquals(0.1, c.getMaximumCapacity(), epsilon);
- assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+ a.updateClusterResource(clusterResource);
+ assertEquals(Resources.multiply(clusterResource,
+ a.getAbsoluteMaximumCapacity() * a.getMaxAMResourcePerQueuePercent()),
+ a.getAMResourceLimit());
+
+ b.updateClusterResource(clusterResource);
+ assertEquals(Resources.multiply(clusterResource,
+ b.getAbsoluteMaximumCapacity() * b.getMaxAMResourcePerQueuePercent()),
+ b.getAMResourceLimit());
}
@Test
@@ -679,7 +696,7 @@ public void testComputeUserLimitAndSetHeadroom(){
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
qb.submitApplicationAttempt(app_0, user_0);
Priority u0Priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
@@ -702,7 +719,7 @@ public void testComputeUserLimitAndSetHeadroom(){
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
Priority u1Priority = TestUtils.createMockPriority(2);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
@@ -736,12 +753,12 @@ public void testComputeUserLimitAndSetHeadroom(){
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
u0Priority, recordFactory)));
@@ -764,7 +781,7 @@ public void testComputeUserLimitAndSetHeadroom(){
TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
@@ -2291,20 +2308,16 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
LeafQueue a = new LeafQueue(csContext, A, root, null);
assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
- assertEquals(160, a.getMaximumActiveApplications());
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
LeafQueue newA = new LeafQueue(csContext, A, root, null);
a.reinitialize(newA, clusterResource);
assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
- assertEquals(320, a.getMaximumActiveApplications());
Resource newClusterResource = Resources.createResource(100 * 20 * GB,
100 * 32);
a.updateClusterResource(newClusterResource);
- // 100 * 20 * 0.2 = 400
- assertEquals(400, a.getMaximumActiveApplications());
}
@Test
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 2a49545..985609e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -77,6 +77,7 @@
.getRecordFactory(null);
RMContext rmContext;
+ RMContext spyRMContext;
CapacityScheduler cs;
// CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
@@ -132,7 +133,10 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
root = CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
- cs.setRMContext(rmContext);
+ spyRMContext = spy(rmContext);
+ when(spyRMContext.getScheduler()).thenReturn(cs);
+
+ cs.setRMContext(spyRMContext);
cs.init(csConf);
cs.start();
}
@@ -212,14 +216,14 @@ public void testReservation() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -361,14 +365,14 @@ public void testReservationNoContinueLook() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -506,14 +510,14 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -618,7 +622,7 @@ public void testGetAppToUnreserve() throws Exception {
.getMockApplicationAttemptId(0, 0);
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
String host_0 = "host_0";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
@@ -685,7 +689,7 @@ public void testFindNodeToUnreserve() throws Exception {
.getMockApplicationAttemptId(0, 0);
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
@@ -742,14 +746,14 @@ public void testAssignToQueue() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -916,14 +920,14 @@ public void testAssignToUser() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -1042,14 +1046,14 @@ public void testReservationsNoneAvailable() throws Exception {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index b4c4c10..3918bf7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -143,13 +143,14 @@ public void testFifoSchedulerCapacityWhenNoNMs() {
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
+
+ FifoScheduler scheduler = new FifoScheduler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null, writer);
+ null, null, null, null, null, null, null, writer, scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
- FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new Configuration();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
@@ -189,12 +190,14 @@ public void testNodeLocalAssignment() throws Exception {
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+
+ FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+ scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
- FifoScheduler scheduler = new FifoScheduler();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
@@ -260,17 +263,19 @@ public void testUpdateResourceOnNode() throws Exception {
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
- RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
- ((RMContextImpl) rmContext).setSystemMetricsPublisher(
- mock(SystemMetricsPublisher.class));
-
+
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
public Map getNodes(){
return nodes;
}
};
+ RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+ scheduler);
+ ((RMContextImpl) rmContext).setSystemMetricsPublisher(
+ mock(SystemMetricsPublisher.class));
+
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index c7c403d..ef7435a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -82,8 +82,6 @@
int numContainers;
int maxApplications;
int maxApplicationsPerUser;
- int maxActiveApplications;
- int maxActiveApplicationsPerUser;
int userLimit;
float userLimitFactor;
}
@@ -303,10 +301,6 @@ public void verifySubQueueXML(Element qElem, String q,
WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
lqi.maxApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
- lqi.maxActiveApplications =
- WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications");
- lqi.maxActiveApplicationsPerUser =
- WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser");
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
@@ -386,8 +380,6 @@ private void verifySubQueue(JSONObject info, String q,
lqi.numContainers = info.getInt("numContainers");
lqi.maxApplications = info.getInt("maxApplications");
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
- lqi.maxActiveApplications = info.getInt("maxActiveApplications");
- lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
verifyLeafQueueGeneric(q, lqi);
@@ -449,10 +441,6 @@ private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
(float)expectedMaxAppsPerUser,
(float)info.maxApplicationsPerUser, info.userLimitFactor);
- assertTrue("maxActiveApplications doesn't match",
- info.maxActiveApplications > 0);
- assertTrue("maxActiveApplicationsPerUser doesn't match",
- info.maxActiveApplicationsPerUser > 0);
assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
info.userLimit);
assertEquals("userLimitFactor doesn't match",