diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 10f57745ddd..85903bf4280 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -68,6 +68,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
@@ -418,7 +420,8 @@ private RMAppImpl createAndPopulateNewRMApp(
// We only replace the queue when it's a new application
if (!isRecovery) {
- replaceQueueFromPlacementContext(placementContext, submissionContext);
+ copyPlacementQueueToSubmissionContext(placementContext,
+ submissionContext);
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
@@ -443,38 +446,60 @@ private RMAppImpl createAndPopulateNewRMApp(
submissionContext.setPriority(appPriority);
}
- // Since FairScheduler queue mapping is done inside scheduler,
- // if FairScheduler is used and the queue doesn't exist, we should not
- // fail here because queue will be created inside FS. Ideally, FS queue
- // mapping should be done outside scheduler too like CS.
- // For now, exclude FS for the acl check.
- if (!isRecovery && YarnConfiguration.isAclEnabled(conf)
- && scheduler instanceof CapacityScheduler) {
- String queueName = submissionContext.getQueue();
- String appName = submissionContext.getApplicationName();
- CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
-
- if (csqueue == null && placementContext != null) {
- //could be an auto created queue through queue mapping. Validate
- // parent queue exists and has valid acls
- String parentQueueName = placementContext.getParentQueue();
- csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
- }
+ if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
+ if (scheduler instanceof CapacityScheduler) {
+ String queueName = submissionContext.getQueue();
+ String appName = submissionContext.getApplicationName();
+ CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
+
+ if (csqueue == null && placementContext != null) {
+ //could be an auto created queue through queue mapping. Validate
+ // parent queue exists and has valid acls
+ String parentQueueName = placementContext.getParentQueue();
+ csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
+ }
- if (csqueue != null
- && !authorizer.checkPermission(
- new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
- SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
- applicationId.toString(), appName, Server.getRemoteAddress(),
- null))
- && !authorizer.checkPermission(
- new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
- SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
- applicationId.toString(), appName, Server.getRemoteAddress(),
- null))) {
- throw RPCUtil.getRemoteException(new AccessControlException(
- "User " + user + " does not have permission to submit "
- + applicationId + " to queue " + submissionContext.getQueue()));
+ if (csqueue != null
+ && !authorizer.checkPermission(
+ new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+ SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
+ applicationId.toString(), appName, Server.getRemoteAddress(),
+ null))
+ && !authorizer.checkPermission(
+ new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+ SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
+ applicationId.toString(), appName, Server.getRemoteAddress(),
+ null))) {
+ throw RPCUtil.getRemoteException(new AccessControlException(
+ "User " + user + " does not have permission to submit "
+ + applicationId + " to queue "
+ + submissionContext.getQueue()));
+ }
+ }
+ if (scheduler instanceof FairScheduler) {
+ // if we have not placed the app just skip this, the submit will be
+ // rejected in the scheduler.
+ if (placementContext != null) {
+ // The queue might not be created yet. Walk up the tree to check the
+ // parent ACL. The queueName is assured root which always exists
+ String queueName = submissionContext.getQueue();
+ FSQueue queue = ((FairScheduler) scheduler).getQueueManager().
+ getQueue(queueName);
+ while (queue == null) {
+ int sepIndex = queueName.lastIndexOf(".");
+ queueName = queueName.substring(0, sepIndex);
+ queue = ((FairScheduler) scheduler).getQueueManager().
+ getQueue(queueName);
+ }
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+ !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ throw RPCUtil.getRemoteException(new AccessControlException(
+ "User " + user + " does not have permission to submit " +
+ applicationId + " to queue " +
+ submissionContext.getQueue() +
+ " denied by ACL for queue " + queueName));
+ }
+ }
}
}
@@ -841,34 +866,39 @@ ApplicationPlacementContext placeApplication(
// Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery.
if (isRecovery) {
- LOG.warn("PlaceApplication failed,skipping on recovery of rm");
+ LOG.warn("Application placement failed for user " + user +
+ " and application " + context.getApplicationId() +
+ ", skipping placement on recovery of rm");
+ LOG.debug("Exception that caused the placement failure", e);
return placementContext;
}
throw e;
}
}
- if (placementContext == null && (context.getQueue() == null) || context
- .getQueue().isEmpty()) {
+ // The submission context when created often has a queue set. In case of
+ // the FairScheduler a null placement context is still considered as a
+ // failure, even when a queue is provided on submit. This case handled in
+ // the scheduler.
+ if (placementContext == null && (context.getQueue() == null) ||
+ context.getQueue().isEmpty()) {
String msg = "Failed to place application " + context.getApplicationId()
- + " to queue and specified " + "queue is invalid : " + context
- .getQueue();
+ + " in a queue and submit context queue is null or empty";
LOG.error(msg);
throw new YarnException(msg);
}
return placementContext;
}
- void replaceQueueFromPlacementContext(
+ private void copyPlacementQueueToSubmissionContext(
ApplicationPlacementContext placementContext,
ApplicationSubmissionContext context) {
- // Set it to ApplicationSubmissionContext
- //apply queue mapping only to new application submissions
+ // Set the queue from the placement in the ApplicationSubmissionContext
+ // Placement rule are only considered for new applications
if (placementContext != null && !StringUtils.equalsIgnoreCase(
context.getQueue(), placementContext.getQueue())) {
- LOG.info("Placed application=" + context.getApplicationId() +
- " to queue=" + placementContext.getQueue() + ", original queue="
- + context
- .getQueue());
+ LOG.info("Placed application with ID " + context.getApplicationId() +
+ " in queue: " + placementContext.getQueue() +
+ ", original submission queue was: " + context.getQueue());
context.setQueue(placementContext.getQueue());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
index 7e9e6ef93dc..7ceb3745d53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
@@ -58,10 +58,12 @@ QueueManager getQueueManager() {
}
/**
- * Set a rule to generate the parent queue dynamically.
+ * Set a rule to generate the parent queue dynamically. The parent rule
+ * should only be called on rule creation when the policy is read from the
+ * configuration.
* @param parent A PlacementRule
*/
- void setParentRule(PlacementRule parent) {
+ public void setParentRule(PlacementRule parent) {
this.parentRule = parent;
}
@@ -69,7 +71,8 @@ void setParentRule(PlacementRule parent) {
* Get the rule that is set to generate the parent queue dynamically.
* @return The rule set or null if not set.
*/
- PlacementRule getParentRule() {
+ @VisibleForTesting
+ public PlacementRule getParentRule() {
return parentRule;
}
@@ -149,6 +152,14 @@ boolean configuredQueue(String queueName) {
return (queue != null && !queue.isDynamic());
}
+ /**
+ * Get the create flag as set during the config setup.
+ * @return The value of the {@link #createQueue} flag
+ */
+ public boolean getCreateFlag() {
+ return createQueue;
+ }
+
/**
* Get the create flag from the xml configuration element.
* @param conf The FS configuration element for the queue
@@ -159,7 +170,7 @@ boolean configuredQueue(String queueName) {
boolean getCreateFlag(Element conf) {
if (conf != null) {
String create = conf.getAttribute("create");
- return Boolean.parseBoolean(create);
+ return create.isEmpty() || Boolean.parseBoolean(create);
}
return true;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
index 0b5fe2ebd8e..adee5d7e79a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
@@ -28,7 +28,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
/**
- * Utility class for QueuePlacementRule.
+ * Utility class for Capacity Scheduler queue PlacementRules.
*/
public final class QueuePlacementRuleUtils {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index 826d9f523eb..5bdf96d9e04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -107,6 +106,15 @@
private final Set nonPreemptableQueues;
+ /**
+ * Create a fully initialised configuration for the scheduler.
+ * @param queueProperties The list of queues and their properties from the
+ * configuration.
+ * @param allocationFileParser The allocation file parser
+ * @param newPlacementPolicy An initialised queue placement policy.
+ * @param globalReservationQueueConfig The reservation queue config
+ * @throws AllocationConfigurationException
+ */
public AllocationConfiguration(QueueProperties queueProperties,
AllocationFileParser allocationFileParser,
QueuePlacementPolicy newPlacementPolicy,
@@ -145,7 +153,13 @@ public AllocationConfiguration(QueueProperties queueProperties,
queueProperties.getMaxContainerAllocation();
}
- public AllocationConfiguration(Configuration conf) {
+ /**
+ * Create a base scheduler configuration with just the defaults set.
+ * Should only be called to init a basic setup on scheduler init.
+ * @param scheduler The {@link FairScheduler} to create and initialise the
+ * placement policy.
+ */
+ public AllocationConfiguration(FairScheduler scheduler) {
minQueueResources = new HashMap<>();
maxChildQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
@@ -170,7 +184,7 @@ public AllocationConfiguration(Configuration conf) {
configuredQueues.put(queueType, new HashSet<>());
}
placementPolicy =
- QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
+ QueuePlacementPolicy.fromConfiguration(scheduler);
nonPreemptableQueues = new HashSet<>();
queueMaxContainerAllocationMap = new HashMap<>();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index b92bd5dab73..b7d10e8ddb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -78,6 +78,7 @@
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
private final Clock clock;
+ private final FairScheduler scheduler;
// Last time we successfully reloaded queues
private volatile long lastSuccessfulReload;
@@ -95,14 +96,15 @@
private Thread reloadThread;
private volatile boolean running = true;
- public AllocationFileLoaderService() {
- this(SystemClock.getInstance());
+ AllocationFileLoaderService(FairScheduler scheduler) {
+ this(SystemClock.getInstance(), scheduler);
}
private List defaultPermissions;
- public AllocationFileLoaderService(Clock clock) {
+ AllocationFileLoaderService(Clock clock, FairScheduler scheduler) {
super(AllocationFileLoaderService.class.getName());
+ this.scheduler = scheduler;
this.clock = clock;
}
@@ -255,9 +257,8 @@ public synchronized void reloadAllocations()
QueueProperties queueProperties = queueParser.parse();
// Load placement policy and pass it configured queues
- Configuration conf = getConfig();
QueuePlacementPolicy newPlacementPolicy =
- getQueuePlacementPolicy(allocationFileParser, queueProperties, conf);
+ getQueuePlacementPolicy(allocationFileParser);
setupRootQueueProperties(allocationFileParser, queueProperties);
ReservationQueueConfiguration globalReservationQueueConfig =
@@ -273,16 +274,14 @@ public synchronized void reloadAllocations()
}
private QueuePlacementPolicy getQueuePlacementPolicy(
- AllocationFileParser allocationFileParser,
- QueueProperties queueProperties, Configuration conf)
+ AllocationFileParser allocationFileParser)
throws AllocationConfigurationException {
if (allocationFileParser.getQueuePlacementPolicy().isPresent()) {
return QueuePlacementPolicy.fromXml(
allocationFileParser.getQueuePlacementPolicy().get(),
- queueProperties.getConfiguredQueues(), conf);
+ scheduler);
} else {
- return QueuePlacementPolicy.fromConfiguration(conf,
- queueProperties.getConfiguredQueues());
+ return QueuePlacementPolicy.fromConfiguration(scheduler);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index a267639702e..366e9c1d918 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -214,7 +215,7 @@
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
- allocsLoader = new AllocationFileLoaderService();
+ allocsLoader = new AllocationFileLoaderService(this);
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@@ -223,6 +224,10 @@ public FSContext getContext() {
return context;
}
+ public RMContext getRMContext() {
+ return rmContext;
+ }
+
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(resourceCalculator,
@@ -455,7 +460,21 @@ public int getContinuousSchedulingSleepMs() {
* configured limits, but the app will not be marked as runnable.
*/
protected void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
+ String queueName, String user, boolean isAppRecovering,
+ ApplicationPlacementContext placementContext) {
+ // If the placement was rejected the placementContext will be null.
+ // We ignore placement rules on recovery.
+ if (!isAppRecovering && placementContext == null) {
+ String message = "Reject application " + applicationId +
+ " submitted by user " + user +
+ " application rejected by placement rules.";
+ rejectApplicationWithMessage(applicationId, message);
+ return;
+ }
+ // If we get here the queue placement has been run and the queueName
+ // reflects that already. If we are recovering the application the queue
+ // was not replaced by the placement rules and the queueName needs to be
+ // sanity checked
if (queueName == null || queueName.isEmpty()) {
String message =
"Reject application " + applicationId + " submitted by user " + user
@@ -475,12 +494,38 @@ protected void addApplication(ApplicationId applicationId,
writeLock.lock();
try {
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
- FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
+ // Assign the app to the queue creating and prevent queue delete.
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
+ applicationId);
if (queue == null) {
+ rejectApplicationWithMessage(applicationId,
+ queueName + " is not a leaf queue");
+ return;
+ }
+
+ // Enforce ACLs: 2nd check, there could be a time laps between the app
+ // creation in the RMAppManager and getting here. That means we could
+ // have a configuration change (prevent race condition)
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+ user);
+
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+ !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ String msg = "User " + user + " does not have permission to submit " +
+ applicationId + " to queue " + queueName;
+ rejectApplicationWithMessage(applicationId, msg);
+ queue.removeAssignedApp(applicationId);
return;
}
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ if (rmApp != null) {
+ rmApp.setQueue(queueName);
+ } else {
+ LOG.error("Couldn't find RM app for " + applicationId +
+ " to set queue name on");
+ }
+
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
// Resources.fitsIn would always return false when queueMaxShare is 0
// for any resource, but only using Resources.fitsIn is not enough
@@ -499,7 +544,7 @@ protected void addApplication(ApplicationId applicationId,
+ "it has zero amount of resource for a requested "
+ "resource! Invalid requested AM resources: %s, "
+ "maximum queue resources: %s",
- applicationId, queue.getName(),
+ applicationId, queueName,
invalidAMResourceRequests, queue.getMaxShare());
rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
@@ -507,27 +552,13 @@ protected void addApplication(ApplicationId applicationId,
}
}
- // Enforce ACLs
- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
- user);
-
- if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
- .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
- String msg = "User " + userUgi.getUserName()
- + " cannot submit applications to queue " + queue.getName()
- + "(requested queuename is " + queueName + ")";
- rejectApplicationWithMessage(applicationId, msg);
- queue.removeAssignedApp(applicationId);
- return;
- }
-
SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ new SchedulerApplication<>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queue.getName()
+ + ", in queue: " + queueName
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
@@ -600,60 +631,6 @@ protected void addApplicationAttempt(
}
}
- /**
- * Helper method for the tests to assign the app to a queue.
- */
- @VisibleForTesting
- FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
- return assignToQueue(rmApp, queueName, user, null);
- }
-
- /**
- * Helper method that attempts to assign the app to a queue. The method is
- * responsible to call the appropriate event-handler if the app is rejected.
- */
- private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
- ApplicationId applicationId) {
- FSLeafQueue queue = null;
- String appRejectMsg = null;
-
- try {
- QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
- queueName = placementPolicy.assignAppToQueue(queueName, user);
- if (queueName == null) {
- appRejectMsg = "Application rejected by queue placement policy";
- } else {
- queue = queueMgr.getLeafQueue(queueName, true, applicationId);
- if (queue == null) {
- appRejectMsg = queueName + " is not a leaf queue";
- }
- }
- } catch (IllegalStateException se) {
- appRejectMsg = "Unable to match app " + rmApp.getApplicationId() +
- " to a queue placement policy, and no valid terminal queue " +
- " placement rule is configured. Please contact an administrator " +
- " to confirm that the fair scheduler configuration contains a " +
- " valid terminal queue placement rule.";
- } catch (InvalidQueueNameException qne) {
- appRejectMsg = qne.getMessage();
- } catch (IOException ioe) {
- // IOException should only happen for a user without groups
- appRejectMsg = "Error assigning app to a queue: " + ioe.getMessage();
- }
-
- if (appRejectMsg != null && rmApp != null) {
- rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
- return null;
- }
-
- if (rmApp != null) {
- rmApp.setQueue(queue.getName());
- } else {
- LOG.error("Couldn't find RM app to set queue name on");
- }
- return queue;
- }
-
private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication application = applications.remove(
@@ -1271,7 +1248,8 @@ public void handle(SchedulerEvent event) {
if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getIsAppRecovering(),
+ appAddedEvent.getPlacementContext());
}
break;
case APP_REMOVED:
@@ -1448,12 +1426,8 @@ private void initScheduler(Configuration conf) throws IOException {
// This stores per-application scheduling information
this.applications = new ConcurrentHashMap<>();
- allocConf = new AllocationConfiguration(conf);
- try {
- queueMgr.initialize(conf);
- } catch (Exception e) {
- throw new IOException("Failed to start FairScheduler", e);
- }
+ allocConf = new AllocationConfiguration(this);
+ queueMgr.initialize();
if (continuousSchedulingEnabled) {
// Continuous scheduling is deprecated log it on startup
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 6a1f953249d..4fe38d5b0c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -18,34 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.xml.parsers.ParserConfigurationException;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.xml.sax.SAXException;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Iterator;
-import java.util.Set;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
@@ -106,8 +100,7 @@ public FSParentQueue getRootQueue() {
return rootQueue;
}
- public void initialize(Configuration conf) throws IOException,
- SAXException, AllocationConfigurationException, ParserConfigurationException {
+ public void initialize() {
// Policies of root and default queue are set to
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
index 30ea213529d..d63cff83e56 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
@@ -23,85 +23,226 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.RejectPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;
+
+/**
+ * The FairScheduler rules based policy for placing an application in a queue.
+ * It parses the configuration and updates the {@link
+ * org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager}
+ * with a list of {@link PlacementRule}s to execute in order.
+ */
@Private
@Unstable
-public class QueuePlacementPolicy {
- private static final Map> ruleClasses;
+public final class QueuePlacementPolicy {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(QueuePlacementPolicy.class);
+
+ // The list of known rules:
+ // key to the map is the name in the configuration.
+ // for each name the mapping contains the class name of the implementation
+ // and a flag (true, false or create) which describes the terminal state
+ // see {@link #getTerminal} comments.
+ private static final Map> RULES;
static {
- Map> map =
- new HashMap>();
- map.put("user", QueuePlacementRule.User.class);
- map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
- map.put("secondaryGroupExistingQueue",
- QueuePlacementRule.SecondaryGroupExistingQueue.class);
- map.put("specified", QueuePlacementRule.Specified.class);
- map.put("nestedUserQueue",
- QueuePlacementRule.NestedUserQueue.class);
- map.put("default", QueuePlacementRule.Default.class);
- map.put("reject", QueuePlacementRule.Reject.class);
- ruleClasses = Collections.unmodifiableMap(map);
+ Map> map = new HashMap<>();
+ map.put("user", new ArrayList