diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 41dc72f..e226e37 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,6 +76,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
@@ -529,6 +530,12 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet(tagsFromConf));
}
+
+ String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
+ if (jobPriority != null) {
+ Priority priority = Priority.newInstance(Integer.parseInt(jobPriority));
+ appContext.setPriority(priority);
+ }
return appContext;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ff06eea..49b55b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1694,6 +1694,15 @@ private static void addDeprecatedKeys() {
"2000, 500";
/**
+ * Application Priority configurations
+ */
+ public static final String APPLICATION_PRIORITY_PREFIX = YARN_PREFIX
+ + "application-priority.";
+
+ public static final String YARN_CLUSTER_APP_PRIORITY = APPLICATION_PRIORITY_PREFIX
+ + "collection";
+
+ /**
* Flag to indicate if the node labels feature enabled, by default it's
* disabled
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 03fc40e..46b78a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@@ -95,6 +96,7 @@
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMNodeLabelsManager nodeLabelManager;
+ private ApplicationPriorityManager applicationPriorityManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
@@ -417,6 +419,18 @@ public RMNodeLabelsManager getNodeLabelManager() {
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
+
+ @Private
+ @Unstable
+ public ApplicationPriorityManager getApplicationPriorityManager() {
+ return applicationPriorityManager;
+ }
+
+ @Private
+ @Unstable
+ public void setApplicationPriorityManager(ApplicationPriorityManager mgr) {
+ applicationPriorityManager = mgr;
+ }
@Private
@Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 8dcfe67..928dcb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -39,6 +40,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
@@ -326,6 +328,28 @@ private RMAppImpl createAndPopulateNewRMApp(
throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
+
+ Priority appPriority = submissionContext.getPriority();
+ Integer defaultPriority;
+ if (null != appPriority) {
+ try {
+ rmContext.getApplicationPriorityManager()
+ .authenticateApplicationPriority(submissionContext.getPriority(),
+ user, submissionContext.getQueue(), applicationId);
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e.getMessage());
+ }
+ } else if (null == appPriority
+ && (defaultPriority = rmContext.getApplicationPriorityManager()
+ .getDefaultApplicationPriorityFromQueue(
+ submissionContext.getQueue())) != ApplicationPriorityManager.DEFAULT_PRIORITY) {
+ // Get the default priority from Queue and set to Application
+ LOG.info("Application '" + applicationId
+ + "' is submitted without priority hence default queue priority:"
+ + defaultPriority + " is set.");
+ appPriority = Priority.newInstance(defaultPriority);
+ }
+
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
@@ -333,7 +357,7 @@ private RMAppImpl createAndPopulateNewRMApp(
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
- submissionContext.getApplicationTags(), amReq);
+ submissionContext.getApplicationTags(), amReq, appPriority);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ecf6166..ab3a909 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -116,6 +117,10 @@ void setRMApplicationHistoryWriter(
RMNodeLabelsManager getNodeLabelManager();
public void setNodeLabelManager(RMNodeLabelsManager mgr);
+
+ ApplicationPriorityManager getApplicationPriorityManager();
+
+ public void setApplicationPriorityManager(ApplicationPriorityManager mgr);
long getEpoch();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 1d0d6c0..1b50dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -400,6 +401,16 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
activeServiceContext.setNodeLabelManager(mgr);
}
+ @Override
+ public ApplicationPriorityManager getApplicationPriorityManager() {
+ return activeServiceContext.getApplicationPriorityManager();
+ }
+
+ @Override
+ public void setApplicationPriorityManager(ApplicationPriorityManager mgr) {
+ activeServiceContext.setApplicationPriorityManager(mgr);
+ }
+
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8bd8e21..e195d91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -57,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
@@ -337,6 +339,11 @@ protected RMNodeLabelsManager createNodeLabelManager()
return new RMNodeLabelsManager();
}
+ protected ApplicationPriorityManager createApplicationPriorityManager()
+ throws InstantiationException, IllegalAccessException {
+ return new ApplicationPriorityManager();
+ }
+
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
}
@@ -426,6 +433,11 @@ protected void serviceInit(Configuration configuration) throws Exception {
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);
+
+ ApplicationPriorityManager apm = createApplicationPriorityManager();
+ apm.setRMContext(rmContext);
+ addService(apm);
+ rmContext.setApplicationPriorityManager(apm);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java
new file mode 100644
index 0000000..697725f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+public class ApplicationPriorityManager extends AbstractService {
+
+ protected static final Log LOG = LogFactory
+ .getLog(ApplicationPriorityManager.class);
+ public static final String EMPTY_QUEUE_NAME = "";
+ public static final Integer DEFAULT_PRIORITY = 0;
+
+ protected Set clusterPriorityCollections = new HashSet();
+
+ protected final ReadLock readLock;
+ protected final WriteLock writeLock;
+
+ private static Integer MAX_PRIORITY_VALUE = 65535;
+ private static Integer MIN_PRIORITY_VALUE = -65535;
+
+ private RMContext rmContext = null;
+
+ // Map to queueName vs Queue object which has priority labels
+ protected ConcurrentMap queueCollections = new ConcurrentHashMap();
+
+ public ApplicationPriorityManager() {
+ super(ApplicationPriorityManager.class.getName());
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+
+ // Adding DEFAULT_PRIORITY by default to cluster set to avoid default
+ // issues.
+ clusterPriorityCollections.add(Priority.newInstance(DEFAULT_PRIORITY));
+ }
+
+ protected static class Queue {
+ private String queueName;
+ public ApplicationPriorityPerQueue applicationPriorityPerQueue;
+
+ protected Queue() {
+ this.setQueueName(EMPTY_QUEUE_NAME);
+ this.applicationPriorityPerQueue = null;
+ }
+
+ public Queue(String queueName,
+ ApplicationPriorityPerQueue applicationPriorityPerQueue) {
+ this.setQueueName(queueName);
+ this.applicationPriorityPerQueue = applicationPriorityPerQueue;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ // config file can have cluster priority label config, read and update.
+ String[] configPrLabels = conf
+ .getTrimmedStrings(YarnConfiguration.YARN_CLUSTER_APP_PRIORITY);
+
+ getPriorityFromConfig(configPrLabels);
+ }
+
+ private void getPriorityFromConfig(String[] configPrLabels)
+ throws IOException {
+ if (configPrLabels == null || configPrLabels.length == 0) {
+ LOG.info("Empty configuration for application priority collection in clustger level");
+ return;
+ }
+ Set priorityList = new HashSet<>();
+ try {
+ for (String priority : configPrLabels) {
+ priorityList.add(Integer.parseInt(priority));
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid priority configuration");
+ return;
+ }
+
+ if (!priorityList.isEmpty()) {
+ addToClusterApplicationPriorities(priorityList);
+ }
+ }
+
+ public void authenticateApplicationPriority(Priority priority, String user,
+ String queueName, ApplicationId applicationId) throws IOException {
+ try {
+ readLock.lock();
+
+ // verify whether 'priority' is present in cluster
+ if (!clusterPriorityCollections.contains(priority)) {
+ throw new IOException("Priority =" + priority.getPriority()
+ + " is not present in cluster level");
+ }
+
+ Queue queue = queueCollections.get(queueName);
+ if (null == queue) {
+ throw new IOException("Invalid queue name " + queueName);
+ }
+
+ // Verify whether submitted priority is lesser than max priority
+ // in the queue.
+ if (priority.getPriority() > queue.applicationPriorityPerQueue
+ .getMaxApplicationPriority()) {
+ throw new IOException("Invalid priority as Queue: " + queueName
+ + " cannot support more than priority '"
+ + queue.applicationPriorityPerQueue.getMaxApplicationPriority()
+ + "'");
+ }
+
+ LOG.info("Submitted priority '" + priority.getPriority()
+ + "' is acceptable in queue :" + queueName + "for application:"
+ + applicationId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Integer getDefaultApplicationPriorityFromQueue(String queueName) {
+ try {
+ readLock.lock();
+ Queue currentQueue = null;
+ if (this.queueCollections.containsKey(queueName)) {
+ currentQueue = this.queueCollections.get(queueName);
+ } else {
+ return DEFAULT_PRIORITY;
+ }
+ return currentQueue.applicationPriorityPerQueue
+ .getDefaultApplicationPriority();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public RMContext getRMContext() {
+ return this.rmContext;
+ }
+
+ public void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ /**
+ * Add cluster level priority to store
+ *
+ * @param priorityToAdd
+ * new application priority
+ */
+ public void addToClusterApplicationPriorities(Set priorityToAdd)
+ throws IOException {
+ if (null == priorityToAdd || priorityToAdd.isEmpty()) {
+ return;
+ }
+
+ try {
+ writeLock.lock();
+ Set priorityList = normalizeApplicationPriorities(priorityToAdd);
+
+ for (Priority priority : priorityList) {
+ this.clusterPriorityCollections.add(priority);
+ }
+
+ LOG.info("Add to cluster priority: ["
+ + StringUtils.join(priorityToAdd.iterator(), ",") + "]");
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected void verifyAppPrioritiesToRemove(Collection prToRemove)
+ throws IOException {
+ if (null == prToRemove || prToRemove.isEmpty()) {
+ return;
+ }
+
+ // Check if priority to remove doesn't exist or null/empty, will throw
+ // exception
+ for (Integer priority : prToRemove) {
+ if (priority == null) {
+ throw new IOException("Priority to be removed is null");
+ }
+
+ if (!isPriorityExistsInCluster(priority)) {
+ throw new IOException("Priority =" + priority
+ + " to be removed doesn't exist in cluster "
+ + "priority collection.");
+ }
+
+ // check if any queue contains this label
+ for (Entry entry : queueCollections.entrySet()) {
+ String queueName = entry.getKey();
+ ApplicationPriorityPerQueue queueLabels = entry.getValue().applicationPriorityPerQueue;
+ if (queueLabels.getDefaultApplicationPriority().equals(priority)
+ || queueLabels.getMaxApplicationPriority().equals(priority)) {
+ throw new IOException("Cannot remove priority label=" + priority
+ + ", because queue=" + queueName + " is using this label. "
+ + "Please remove label on queue before remove the label");
+ }
+ }
+ }
+ }
+
+ protected void internalRemoveFromClusterAppPriorityList(
+ Collection prToRemove) {
+
+ // remove labels from queue labels collection
+ for (Integer priority : prToRemove) {
+ clusterPriorityCollections.remove(Priority.newInstance(priority));
+ }
+
+ LOG.info("Remove priority: ["
+ + StringUtils.join(prToRemove.iterator(), ",") + "]");
+ }
+
+ /**
+ * Remove from cluster priority list
+ *
+ * @param priorityToRemove
+ * app priority to remove
+ * @throws IOException
+ */
+ public void removeFromClusterApplicationPriority(
+ Collection priorityToRemove) throws IOException {
+ try {
+ writeLock.lock();
+ verifyAppPrioritiesToRemove(priorityToRemove);
+ internalRemoveFromClusterAppPriorityList(priorityToRemove);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Set multiple priorities to Queue
+ *
+ * @param queueName
+ * , applicationPriorityPerQueue max and default priority
+ */
+ public void setApplicationPriorityPerQueue(String queueName,
+ ApplicationPriorityPerQueue applicationPriorityPerQueue)
+ throws IOException {
+ if (null == applicationPriorityPerQueue) {
+ return;
+ }
+
+ // Default priority also need to be in valid range
+ if (applicationPriorityPerQueue.getDefaultApplicationPriority() > applicationPriorityPerQueue
+ .getMaxApplicationPriority()) {
+ throw new IOException("Default Priority configured should be"
+ + "less than or equal to Max application priority '"
+ + applicationPriorityPerQueue.getMaxApplicationPriority());
+ }
+
+ // Validate whether configured priorities are present in Cluster too.
+ if (!validatePerQueueApplicationPriorityInCluster(applicationPriorityPerQueue)) {
+ throw new IOException("Application Priority="
+ + applicationPriorityPerQueue + " to be set to Queue = '" + queueName
+ + "', doesn't exist in cluster " + "priority collection.");
+ }
+
+ try {
+ writeLock.lock();
+
+ Queue currentQueue = null;
+ if (this.queueCollections.containsKey(queueName)) {
+ currentQueue = this.queueCollections.get(queueName);
+ currentQueue.applicationPriorityPerQueue = applicationPriorityPerQueue;
+ } else {
+ currentQueue = new Queue(queueName, applicationPriorityPerQueue);
+ this.queueCollections.put(queueName, currentQueue);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ LOG.info("Set priority labels to Queue: QueueName - " + queueName
+ + "Max Priority - "
+ + applicationPriorityPerQueue.getMaxApplicationPriority()
+ + ", Default Priority - "
+ + applicationPriorityPerQueue.getDefaultApplicationPriority());
+ }
+
+ /**
+ * Get existing valid labels in cluster
+ *
+ * @return existing valid labels in cluster
+ */
+ public Set getClusterPriorities() {
+ try {
+ readLock.lock();
+ Set priorities = new HashSet(
+ clusterPriorityCollections);
+ priorities.remove(Priority.newInstance(DEFAULT_PRIORITY));
+ return Collections.unmodifiableSet(getPrioritiesToSave(priorities));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get cluster priority from Queue
+ *
+ * @param queueName
+ * @return PriorityLabelsPerQueue
+ * @throws IOException
+ */
+ public ApplicationPriorityPerQueue getApplicationPriorityFromQueue(
+ String queueName) throws IOException {
+ try {
+ readLock.lock();
+ Queue currentQueue = null;
+ if (this.queueCollections.containsKey(queueName)) {
+ currentQueue = this.queueCollections.get(queueName);
+ } else {
+ throw new IOException("Priority labels for Queue: '" + queueName
+ + "' doesn't exist in cluster.");
+ }
+ return currentQueue.applicationPriorityPerQueue;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get mapping of queue to priority
+ *
+ * @return queues to priority map
+ */
+ public Map getApplicationPrioritiesPerQueue() {
+ try {
+ readLock.lock();
+ Map queueToPriorities = new HashMap();
+ for (Entry entry : queueCollections.entrySet()) {
+ String queueName = entry.getKey();
+ Queue queue = entry.getValue();
+
+ queueToPriorities.put(queueName, queue.applicationPriorityPerQueue);
+ }
+ return Collections.unmodifiableMap(queueToPriorities);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private Set getPrioritiesToSave(Set priorityList) {
+ Set prioritySet = new HashSet();
+ for (Priority priority : priorityList) {
+ prioritySet.add(priority.getPriority());
+ }
+ return prioritySet;
+ }
+
+ private Set normalizeApplicationPriorities(
+ Set applicationPriorities) {
+ Set priorityList = new HashSet();
+
+ for (Integer priority : applicationPriorities) {
+ if (priority < MIN_PRIORITY_VALUE || priority > MAX_PRIORITY_VALUE) {
+ continue; // skip invalid entries
+ }
+ priorityList.add(Priority.newInstance(priority));
+ }
+ return priorityList;
+ }
+
+ private boolean validatePerQueueApplicationPriorityInCluster(
+ ApplicationPriorityPerQueue applicationPriorityPerQueue) {
+ if (!isPriorityExistsInCluster(applicationPriorityPerQueue
+ .getDefaultApplicationPriority())) {
+ return false;
+ }
+
+ if (!isPriorityExistsInCluster(applicationPriorityPerQueue
+ .getMaxApplicationPriority())) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Verify whether priority exists in repository
+ *
+ * @return true or false
+ */
+ public boolean isPriorityExistsInCluster(Integer priorityValue) {
+ try {
+ readLock.lock();
+ Priority priority = Priority.newInstance(priorityValue);
+ return this.clusterPriorityCollections.contains(priority);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java
new file mode 100644
index 0000000..c99b084
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority;
+
+
+
+/**
+ * PriorityLabelsPerQueue holds different types of priority labels in a queue.
+ *
+ * It includes information such as:
+ *
+ * - Maximum Application Priority.
+ * - Default Application Priority.
+ *
+ *
+ *
+ */
+public class ApplicationPriorityPerQueue implements
+ Comparable {
+
+ private Integer defaultAppPriority;
+ private Integer maxAppPriority;
+
+ public static ApplicationPriorityPerQueue newInstance(
+ Integer maxPriorityLabel, Integer defaultPriorityLabel) {
+ ApplicationPriorityPerQueue perQueuePriorityLabels = new ApplicationPriorityPerQueue();
+ perQueuePriorityLabels.setMaxApplicationPriority(maxPriorityLabel);
+ perQueuePriorityLabels.setDefaultApplicationPriority(defaultPriorityLabel);
+ return perQueuePriorityLabels;
+ }
+
+ /**
+ * Get the max application priority of the queue.
+ *
+ * @return max application priority configured in the queue
+ */
+ public Integer getMaxApplicationPriority() {
+ return maxAppPriority;
+ }
+
+ public void setMaxApplicationPriority(Integer maxApplicationPriority) {
+ maxAppPriority = maxApplicationPriority;
+ }
+
+ /**
+ * Get the default application priority of the queue.
+ *
+ * @return default application priority configured in the queue
+ */
+ public Integer getDefaultApplicationPriority() {
+ return defaultAppPriority;
+ }
+
+ public void setDefaultApplicationPriority(Integer defaultApplicationPriority) {
+ defaultAppPriority = defaultApplicationPriority;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ ApplicationPriorityPerQueue other = (ApplicationPriorityPerQueue) obj;
+ if (!this.getDefaultApplicationPriority().equals(
+ other.getDefaultApplicationPriority()))
+ return false;
+
+ if (!this.getMaxApplicationPriority().equals(
+ other.getMaxApplicationPriority()))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Max application priority: " + this.getMaxApplicationPriority()
+ + ", " + "Default application priority: "
+ + this.getDefaultApplicationPriority();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 92821;
+ int result = 17;
+ result = result
+ * prime
+ + Math
+ .min(getMaxApplicationPriority(), getDefaultApplicationPriority());
+ result = result
+ * prime
+ + Math
+ .max(getMaxApplicationPriority(), getDefaultApplicationPriority());
+ return result;
+ }
+
+ @Override
+ public int compareTo(ApplicationPriorityPerQueue priorityLabelsPerQueue) {
+ if (priorityLabelsPerQueue == null) {
+ return -1;
+ }
+ int defltLabelCompare = priorityLabelsPerQueue
+ .getDefaultApplicationPriority() - this.getDefaultApplicationPriority();
+ if (defltLabelCompare == 0) {
+ return priorityLabelsPerQueue.getMaxApplicationPriority()
+ - this.getMaxApplicationPriority();
+ } else {
+ return defltLabelCompare;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..cab82ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -233,6 +234,14 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
YarnApplicationState createApplicationState();
/**
+ * Returns the application priority
+ *
+ * @return the application priority.
+ */
+
+ Priority getApplicationPriority();
+
+ /**
* Get RMAppMetrics of the {@link RMApp}.
*
* @return metrics
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..67e3886 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -123,7 +124,8 @@
private final Set updatedNodes = new HashSet();
private final String applicationType;
private final Set applicationTags;
-
+ private final Priority appPriority;
+
private final long attemptFailuresValidityInterval;
private Clock systemClock;
@@ -362,7 +364,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime,
String applicationType, Set applicationTags,
- ResourceRequest amReq) {
+ ResourceRequest amReq, Priority priority) {
this.systemClock = new SystemClock();
@@ -382,6 +384,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.applicationType = applicationType;
this.applicationTags = applicationTags;
this.amReq = amReq;
+ this.appPriority = priority;
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -872,7 +875,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (app.attempts.isEmpty()) {
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
- app.submissionContext.getReservationID()));
+ app.submissionContext.getReservationID(), app.getApplicationPriority()));
return RMAppState.SUBMITTED;
}
@@ -880,7 +883,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// knows applications before AM or NM re-registers.
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user, true,
- app.submissionContext.getReservationID()));
+ app.submissionContext.getReservationID(), app.getApplicationPriority()));
// recover attempts
app.recoverAppAttempts();
@@ -908,7 +911,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
- app.submissionContext.getReservationID()));
+ app.submissionContext.getReservationID(), app.getApplicationPriority()));
}
}
@@ -1356,4 +1359,9 @@ protected Credentials parseCredentials() throws IOException {
}
return credentials;
}
+
+ @Override
+ public Priority getApplicationPriority() {
+ return this.appPriority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 2c788aa..ce9787c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Private
@@ -28,16 +29,24 @@
private Queue queue;
private final String user;
private T currentAttempt;
+ private Priority priority;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
this.user = user;
+ this.priority = null;
+ }
+
+ public SchedulerApplication(Queue queue, String user, Priority priority) {
+ this.queue = queue;
+ this.user = user;
+ this.priority = priority;
}
public Queue getQueue() {
return queue;
}
-
+
public void setQueue(Queue queue) {
this.queue = queue;
}
@@ -58,4 +67,11 @@ public void stop(RMAppState rmAppFinalState) {
queue.getMetrics().finishApp(user, rmAppFinalState);
}
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Priority priority) {
+ this.priority = priority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 532df05..1e5da47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -93,6 +93,7 @@
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
+ private Priority appPriority = null;
protected List newlyAllocatedContainers =
new ArrayList();
@@ -619,4 +620,12 @@ public synchronized void recoverContainer(RMContainer rmContainer) {
// schedulingOpportunities
// lastScheduledContainer
}
+
+ public Priority getApplicationPriority() {
+ return appPriority;
+ }
+
+ public void setApplicationPriority(Priority appPriority) {
+ this.appPriority = appPriority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 28ce264..bcca9e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -148,6 +149,11 @@ public int compare(CSQueue q1, CSQueue q2) {
new Comparator() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+ if (a1.getApplicationPriority() != null
+ && !a1.getApplicationPriority().equals(a2.getApplicationPriority())) {
+ return a1.getApplicationPriority().compareTo(
+ a2.getApplicationPriority());
+ }
return a1.getApplicationId().compareTo(a2.getApplicationId());
}
};
@@ -668,7 +674,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
}
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
+ String queueName, String user, boolean isAppRecovering, Priority priority) {
if (mappings != null && mappings.size() > 0) {
try {
@@ -737,7 +743,7 @@ private synchronized void addApplication(ApplicationId applicationId,
// update the metrics
queue.getMetrics().submitApp(user);
SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ new SchedulerApplication(queue, user, priority);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
@@ -767,6 +773,9 @@ private synchronized void addApplicationAttempt(
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
+ if (application.getPriority() != null) {
+ attempt.setApplicationPriority(application.getPriority());
+ }
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
@@ -1158,7 +1167,8 @@ public void handle(SchedulerEvent event) {
addApplication(appAddedEvent.getApplicationId(),
queueName,
appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getIsAppRecovering(),
+ appAddedEvent.getApplicatonPriority());
}
}
break;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 102e553..b16f324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -191,6 +191,15 @@
@Private
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
+
+ @Private
+ public static final String MAX_APPLICATION_PRIORITY = "max_application_priority";
+
+ @Private
+ public static final String DEFAULT_APPLICATION_PRIORITY = "default_application_priority";
+
+ @Private
+ public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = -1;
@Private
public static class QueueMapping {
@@ -859,4 +868,17 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
defaultVal);
return preemptionDisabled;
}
+
+ public Integer getMaxApplicationPriorityConfPerQueue(String queue) {
+ Integer maxPriority = getInt(getQueuePrefix(queue)
+ + MAX_APPLICATION_PRIORITY, DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+ return maxPriority;
+ }
+
+ public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
+ Integer dfltPriority = getInt(getQueuePrefix(queue)
+ + DEFAULT_APPLICATION_PRIORITY,
+ DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+ return dfltPriority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3910ac8..61090e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityPerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -93,6 +94,10 @@
private int nodeLocalityDelay;
+ private Integer maxAppPriorityPerQueue;
+ private Integer dfltAppPriorityPerQueue;
+
+
Set activeApplications;
Map applicationAttemptMap =
new HashMap();
@@ -208,6 +213,23 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
labelStrBuilder.append(",");
}
}
+
+ maxAppPriorityPerQueue = conf
+ .getMaxApplicationPriorityConfPerQueue(getQueuePath());
+ dfltAppPriorityPerQueue = conf
+ .getDefaultApplicationPriorityConfPerQueue(getQueuePath());
+
+ // Set the priority level configuration of a queue to ApplicationPriority
+ // Manager
+ if (maxAppPriorityPerQueue != -1 || dfltAppPriorityPerQueue != -1) {
+ this.scheduler
+ .getRMContext()
+ .getApplicationPriorityManager()
+ .setApplicationPriorityPerQueue(
+ this.queueName,
+ ApplicationPriorityPerQueue.newInstance(maxAppPriorityPerQueue,
+ dfltAppPriorityPerQueue));
+ }
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + queueCapacities.getCapacity() +
@@ -254,7 +276,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n" +
- "preemptionDisabled = " + getPreemptionDisabled() + "\n");
+ "preemptionDisabled = " + getPreemptionDisabled() + "\n" +
+ "maxAppPriorityPerQueue = " + maxAppPriorityPerQueue + "\n" +
+ "dfltAppPriorityPerQueue = " + dfltAppPriorityPerQueue);
}
@Override
@@ -1946,6 +1970,14 @@ public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
+ public Integer getMaxAppPriorityPerQueue() {
+ return maxAppPriorityPerQueue;
+ }
+
+ public Integer getDefaultAppPriorityPerQueue() {
+ return dfltAppPriorityPerQueue;
+ }
+
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index a54e4bf..f88b3a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
public class AppAddedSchedulerEvent extends SchedulerEvent {
@@ -28,25 +29,27 @@
private final String user;
private final ReservationId reservationID;
private final boolean isAppRecovering;
+ private final Priority appPriority;
public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) {
- this(applicationId, queue, user, false, null);
+ this(applicationId, queue, user, false, null, null);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
- String user, ReservationId reservationID) {
- this(applicationId, queue, user, false, reservationID);
+ String user, ReservationId reservationID, Priority appPriority) {
+ this(applicationId, queue, user, false, reservationID, appPriority);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
- String user, boolean isAppRecovering, ReservationId reservationID) {
+ String user, boolean isAppRecovering, ReservationId reservationID, Priority appPriority) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
+ this.appPriority = appPriority;
}
public ApplicationId getApplicationId() {
@@ -68,4 +71,8 @@ public boolean getIsAppRecovering() {
public ReservationId getReservationID() {
return reservationID;
}
+
+ public Priority getApplicatonPriority() {
+ return appPriority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 06c6b32..72d2237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -283,6 +284,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
waitForAccepted);
}
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, String queue,
+ boolean waitForAccepted, Integer priority) throws Exception {
+ return submitApp(masterMemory, name, user, acls, false, queue,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ waitForAccepted, false, priority);
+ }
public RMApp submitApp(int masterMemory, String name, String user,
Map acls, boolean unmanaged, String queue,
@@ -313,7 +323,16 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- false, null, 0, null, true);
+ false, null, 0, null, true, -1);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, Integer priority) throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ false, null, 0, null, true, priority);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
@@ -322,7 +341,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, attemptFailuresValidityInterval, null, true);
+ false, null, attemptFailuresValidityInterval, null, true, -1);
}
public RMApp submitApp(int masterMemory, String name, String user,
@@ -332,7 +351,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- isAppIdProvided, applicationId, 0, null, true);
+ isAppIdProvided, applicationId, 0, null, true, -1);
}
public RMApp submitApp(int masterMemory,
@@ -341,7 +360,7 @@ public RMApp submitApp(int masterMemory,
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, 0, logAggregationContext, true);
+ false, null, 0, logAggregationContext, true, -1);
}
public RMApp submitApp(int masterMemory, String name, String user,
@@ -351,6 +370,22 @@ public RMApp submitApp(int masterMemory, String name, String user,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete)
throws Exception {
+ return submitApp(masterMemory, name, user,
+ acls, unmanaged, queue,
+ maxAppAttempts, ts, appType,
+ waitForAccepted, keepContainers, isAppIdProvided,
+ applicationId, attemptFailuresValidityInterval,
+ logAggregationContext, cancelTokensWhenComplete, -1);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, long attemptFailuresValidityInterval,
+ LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete,
+ Integer priority)
+ throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -373,6 +408,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
sub.setQueue(queue);
}
sub.setApplicationType(appType);
+ sub.setPriority(Priority.newInstance(priority));
ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class);
final Resource capability = Records.newRecord(Resource.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index 9f54de8..5e564de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -153,7 +153,7 @@ protected void submitApplication(
this.rmContext.getScheduler(),
this.rmContext.getApplicationMasterService(),
submitTime, submissionContext.getApplicationType(),
- submissionContext.getApplicationTags(), null);
+ submissionContext.getApplicationTags(), null, null);
this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
application);
//Do not send RMAppEventType.START event
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index dd2b3f8..678b37f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -101,6 +101,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@@ -122,6 +123,7 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@@ -1225,7 +1227,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
System.currentTimeMillis(), "YARN", null,
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- Resource.newInstance(1024, 1), 1)){
+ Resource.newInstance(1024, 1), 1), null){
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
@@ -1512,4 +1514,46 @@ protected ClientRMService createClientRMService() {
rpc.stopProxy(client, conf);
rm.close();
}
+
+ @Test (timeout = 30000)
+ @SuppressWarnings ("rawtypes")
+ public void testAppSubmitWithPriority() throws Exception {
+ YarnScheduler yarnScheduler = mockYarnScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ ApplicationPriorityManager appPrMgr = mock(ApplicationPriorityManager.class);
+ when(rmContext.getApplicationPriorityManager()).thenReturn(appPrMgr);
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null,
+ mock(ApplicationACLsManager.class), new Configuration());
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler() {
+ public void handle(Event event) {
+ }
+ });
+ ApplicationId appId1 = getApplicationId(100);
+
+ ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
+ appManager, null, null, null);
+
+ // with name and queue
+ String name = MockApps.newAppName();
+ String queue = MockApps.newQueue();
+ SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1,
+ name, queue);
+ submitRequest1.getApplicationSubmissionContext().setApplicationType(
+ "matchType");
+ submitRequest1.getApplicationSubmissionContext().setPriority(
+ Priority.newInstance(4));
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app1 = rmContext.getRMApps().get(appId1);
+ Assert.assertNotNull("app doesn't exist", app1);
+ Assert.assertEquals("app name doesn't match", name, app1.getName());
+ Assert.assertEquals("app queue doesn't match", queue, app1.getQueue());
+ Assert.assertEquals("app priority doesn't match", Priority.newInstance(4),
+ app1.getApplicationPriority());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java
new file mode 100644
index 0000000..cd752c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+public class ApplicationPriorityTestBase {
+ public static void assertMapEquals(
+ Map m1,
+ ImmutableMap m2) {
+ Assert.assertEquals(m1.size(), m2.size());
+ for (String k : m1.keySet()) {
+ Assert.assertTrue(m2.containsKey(k));
+ m1.get(k).equals(m2.get(k));
+ }
+ }
+
+ public static void assertMapContains(
+ Map m1,
+ ImmutableMap m2) {
+ for (String k : m2.keySet()) {
+ Assert.assertTrue(m1.containsKey(k));
+ m1.get(k).equals(m2.get(k));
+ }
+ }
+
+ public static void assertCollectionEquals(Collection c1,
+ Collection c2) {
+ Set s1 = new HashSet(c1);
+ Set s2 = new HashSet(c2);
+ Assert.assertEquals(s1, s2);
+ Assert.assertTrue(s1.containsAll(s2));
+ }
+
+ public static Set toSet(E... elements) {
+ Set set = Sets.newHashSet(elements);
+ return set;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java
new file mode 100644
index 0000000..94e426b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+public class DummyApplicationPriorityManager
+ extends
+ ApplicationPriorityManager {
+
+ public DummyApplicationPriorityManager() {
+ super();
+ }
+
+ Collection lastAddedPrioritylabels = null;
+ Collection lastRemovedPrioritylabels = null;
+
+ @Override
+ public void addToClusterApplicationPriorities(Set priorityToAdd)
+ throws IOException {
+ lastAddedPrioritylabels = priorityToAdd;
+ super.addToClusterApplicationPriorities(priorityToAdd);
+ }
+
+ @Override
+ public void removeFromClusterApplicationPriority(
+ Collection priorityToRemove) throws IOException {
+ lastRemovedPrioritylabels = priorityToRemove;
+ super.removeFromClusterApplicationPriority(priorityToRemove);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java
new file mode 100644
index 0000000..ba054e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestApplicationPriorityManager
+ extends
+ ApplicationPriorityTestBase {
+ DummyApplicationPriorityManager mgr = null;
+
+ @Before
+ public void before() {
+ mgr = new DummyApplicationPriorityManager();
+ mgr.init(new Configuration());
+ mgr.start();
+ }
+
+ @After
+ public void after() {
+ mgr.stop();
+ }
+
+ @Test
+ public void testAddRemovePrioritylabel() throws Exception {
+ // mgr.setMaxPriorityValue(-1);
+ // Add some label
+ mgr.addToClusterApplicationPriorities(ImmutableSet.of(1));
+ assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1));
+
+ // Add a label w/o integer mapping, it should take incremented value from
+ // last updated priority integer value.
+ mgr.addToClusterApplicationPriorities(ImmutableSet.of(3));
+ mgr.addToClusterApplicationPriorities(toSet(5, 7));
+ assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(5, 7));
+
+ Assert.assertTrue(mgr.getClusterPriorities().containsAll(
+ Sets.newHashSet(1, 3, 5, 7)));
+
+ // try to remove null, empty and non-existed label, should fail
+ for (Integer p : Arrays.asList(null, 9)) {
+ boolean caught = false;
+ try {
+ mgr.removeFromClusterApplicationPriority(Arrays.asList(p));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("remove priority should fail "
+ + "when label is null/empty/non-existed", caught);
+ }
+
+ // Remove some label
+ mgr.removeFromClusterApplicationPriority(Arrays.asList(3));
+ assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(3));
+ Assert.assertTrue(mgr.getClusterPriorities().containsAll(
+ Arrays.asList(1, 5, 7)));
+
+ mgr.removeFromClusterApplicationPriority(Arrays.asList(1, 5, 7));
+ Assert.assertTrue(mgr.lastRemovedPrioritylabels.containsAll(Sets
+ .newHashSet(1, 5, 7)));
+ Assert.assertTrue(mgr.getClusterPriorities().isEmpty());
+ }
+
+ @Test
+ public void testAddandSetPrioritylabel() throws Exception {
+ // Add some label
+ mgr.addToClusterApplicationPriorities(ImmutableSet.of(1));
+ assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1));
+
+ mgr.addToClusterApplicationPriorities(toSet(2, 3));
+ assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(2, 3));
+
+ Assert.assertTrue(mgr.getClusterPriorities().containsAll(
+ Sets.newHashSet(1, 2, 3)));
+
+ // Set non-exiting labels to a queue
+ boolean caught = false;
+ try {
+ mgr.setApplicationPriorityPerQueue("queueA",
+ ApplicationPriorityPerQueue.newInstance(4, 1));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("Cannot set label to a Queue "
+ + "when label is non-existing in Queue", caught);
+
+ // Set some labels to a queue
+ mgr.setApplicationPriorityPerQueue("queueA",
+ ApplicationPriorityPerQueue.newInstance(2, 1));
+
+ Assert.assertTrue(mgr.getApplicationPriorityFromQueue("queueA").equals(
+ ApplicationPriorityPerQueue.newInstance(2, 1)));
+ }
+
+
+ @Test(timeout = 5000)
+ public void testRemovePrioritylabelAddedToQueue() throws Exception {
+ mgr.addToClusterApplicationPriorities(toSet(1, 2, 3));
+ mgr.setApplicationPriorityPerQueue("queueA",
+ ApplicationPriorityPerQueue.newInstance(2, 1));
+ mgr.setApplicationPriorityPerQueue("queueB",
+ ApplicationPriorityPerQueue.newInstance(3, 1));
+
+ boolean caught = false;
+ try {
+ mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2));
+ } catch (IOException e) {
+ caught = true;
+ }
+ Assert.assertTrue("Trying to remove priority from cluster but "
+ + "label exists in a queue, should fail", caught);
+
+ mgr.setApplicationPriorityPerQueue("queueA",
+ ApplicationPriorityPerQueue.newInstance(1, 3));
+
+ // Now removal should be a success
+ mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2));
+ assertMapEquals(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of(
+ "queueB", ApplicationPriorityPerQueue.newInstance(3, 1), "queueA",
+ ApplicationPriorityPerQueue.newInstance(1, 3)));
+ assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(2));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..0bd3813 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -190,6 +191,11 @@ public ReservationId getReservationId() {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Priority getApplicationPriority() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..c2c254f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -57,6 +58,7 @@
RMAppAttempt attempt;
int maxAppAttempts = 1;
ResourceRequest amReq;
+ Priority priority = null;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@@ -271,4 +273,9 @@ public ReservationId getReservationId() {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public Priority getApplicationPriority() {
+ return priority;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 72f1dff..83d16e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -264,7 +264,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
RMApp application =
new RMAppImpl(applicationId, rmContext, conf, name, user, queue,
submissionContext, scheduler, masterService,
- System.currentTimeMillis(), "YARN", null, null);
+ System.currentTimeMillis(), "YARN", null, null, null);
testAppStartState(applicationId, user, name, queue, application);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
@@ -975,7 +975,7 @@ public void testRecoverApplication(ApplicationStateData appState,
submissionContext.getApplicationTags(),
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1));
+ submissionContext.getResource(), 1), submissionContext.getPriority());
Assert.assertEquals(RMAppState.NEW, application.getState());
RMAppEvent recoverEvent =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 8656175..be82cdd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -216,7 +216,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId,
RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, amResource, null), null, null,
- 0, null, null, null);
+ 0, null, null, null, null);
rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
attId.getApplicationId(), queue, user);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index c29dbfc..212f942 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2674,7 +2674,7 @@ public void testNotAllowSubmitApplication() throws Exception {
RMApp application =
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
queue, submissionContext, scheduler, masterService,
- System.currentTimeMillis(), "YARN", null, null);
+ System.currentTimeMillis(), "YARN", null, null, null);
resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
--
1.9.4.msysgit.1