diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 41dc72f..e226e37 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; @@ -529,6 +530,12 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( if (tagsFromConf != null && !tagsFromConf.isEmpty()) { appContext.setApplicationTags(new HashSet(tagsFromConf)); } + + String jobPriority = jobConf.get(MRJobConfig.PRIORITY); + if (jobPriority != null) { + Priority priority = Priority.newInstance(Integer.parseInt(jobPriority)); + appContext.setPriority(priority); + } return appContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ff06eea..49b55b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1694,6 +1694,15 @@ private static void addDeprecatedKeys() { "2000, 500"; /** + * Application Priority configurations + */ + public static final String APPLICATION_PRIORITY_PREFIX = YARN_PREFIX + + "application-priority."; + + public static final String YARN_CLUSTER_APP_PRIORITY = APPLICATION_PRIORITY_PREFIX + + "collection"; + + /** * Flag to indicate if the node labels feature enabled, by default it's * disabled */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 03fc40e..46b78a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; @@ -95,6 +96,7 @@ private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; private RMNodeLabelsManager nodeLabelManager; + private ApplicationPriorityManager applicationPriorityManager; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -417,6 +419,18 @@ public RMNodeLabelsManager getNodeLabelManager() { public void setNodeLabelManager(RMNodeLabelsManager mgr) { nodeLabelManager = mgr; } + + @Private + @Unstable + public ApplicationPriorityManager getApplicationPriorityManager() { + return applicationPriorityManager; + } + + @Private + @Unstable + public void setApplicationPriorityManager(ApplicationPriorityManager mgr) { + applicationPriorityManager = mgr; + } @Private @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 8dcfe67..928dcb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -326,6 +328,28 @@ private RMAppImpl createAndPopulateNewRMApp( throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); + + Priority appPriority = submissionContext.getPriority(); + Integer defaultPriority; + if (null != appPriority) { + try { + rmContext.getApplicationPriorityManager() + .authenticateApplicationPriority(submissionContext.getPriority(), + user, submissionContext.getQueue(), applicationId); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e.getMessage()); + } + } else if (null == appPriority + && (defaultPriority = rmContext.getApplicationPriorityManager() + .getDefaultApplicationPriorityFromQueue( + submissionContext.getQueue())) != ApplicationPriorityManager.DEFAULT_PRIORITY) { + // Get the default priority from Queue and set to Application + LOG.info("Application '" + applicationId + + "' is submitted without priority hence default queue priority:" + + defaultPriority + " is set."); + appPriority = Priority.newInstance(defaultPriority); + } + // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -333,7 +357,7 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); + submissionContext.getApplicationTags(), amReq, appPriority); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ecf6166..ab3a909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -116,6 +117,10 @@ void setRMApplicationHistoryWriter( RMNodeLabelsManager getNodeLabelManager(); public void setNodeLabelManager(RMNodeLabelsManager mgr); + + ApplicationPriorityManager getApplicationPriorityManager(); + + public void setApplicationPriorityManager(ApplicationPriorityManager mgr); long getEpoch(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1d0d6c0..1b50dd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -400,6 +401,16 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { activeServiceContext.setNodeLabelManager(mgr); } + @Override + public ApplicationPriorityManager getApplicationPriorityManager() { + return activeServiceContext.getApplicationPriorityManager(); + } + + @Override + public void setApplicationPriorityManager(ApplicationPriorityManager mgr) { + activeServiceContext.setApplicationPriorityManager(mgr); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8bd8e21..e195d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; @@ -337,6 +339,11 @@ protected RMNodeLabelsManager createNodeLabelManager() return new RMNodeLabelsManager(); } + protected ApplicationPriorityManager createApplicationPriorityManager() + throws InstantiationException, IllegalAccessException { + return new ApplicationPriorityManager(); + } + protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); } @@ -426,6 +433,11 @@ protected void serviceInit(Configuration configuration) throws Exception { nlm.setRMContext(rmContext); addService(nlm); rmContext.setNodeLabelManager(nlm); + + ApplicationPriorityManager apm = createApplicationPriorityManager(); + apm.setRMContext(rmContext); + addService(apm); + rmContext.setApplicationPriorityManager(apm); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java new file mode 100644 index 0000000..697725f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityManager.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +public class ApplicationPriorityManager extends AbstractService { + + protected static final Log LOG = LogFactory + .getLog(ApplicationPriorityManager.class); + public static final String EMPTY_QUEUE_NAME = ""; + public static final Integer DEFAULT_PRIORITY = 0; + + protected Set clusterPriorityCollections = new HashSet(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + private static Integer MAX_PRIORITY_VALUE = 65535; + private static Integer MIN_PRIORITY_VALUE = -65535; + + private RMContext rmContext = null; + + // Map to queueName vs Queue object which has priority labels + protected ConcurrentMap queueCollections = new ConcurrentHashMap(); + + public ApplicationPriorityManager() { + super(ApplicationPriorityManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + // Adding DEFAULT_PRIORITY by default to cluster set to avoid default + // issues. + clusterPriorityCollections.add(Priority.newInstance(DEFAULT_PRIORITY)); + } + + protected static class Queue { + private String queueName; + public ApplicationPriorityPerQueue applicationPriorityPerQueue; + + protected Queue() { + this.setQueueName(EMPTY_QUEUE_NAME); + this.applicationPriorityPerQueue = null; + } + + public Queue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + this.setQueueName(queueName); + this.applicationPriorityPerQueue = applicationPriorityPerQueue; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + + // config file can have cluster priority label config, read and update. + String[] configPrLabels = conf + .getTrimmedStrings(YarnConfiguration.YARN_CLUSTER_APP_PRIORITY); + + getPriorityFromConfig(configPrLabels); + } + + private void getPriorityFromConfig(String[] configPrLabels) + throws IOException { + if (configPrLabels == null || configPrLabels.length == 0) { + LOG.info("Empty configuration for application priority collection in clustger level"); + return; + } + Set priorityList = new HashSet<>(); + try { + for (String priority : configPrLabels) { + priorityList.add(Integer.parseInt(priority)); + } + } catch (NumberFormatException e) { + LOG.error("Invalid priority configuration"); + return; + } + + if (!priorityList.isEmpty()) { + addToClusterApplicationPriorities(priorityList); + } + } + + public void authenticateApplicationPriority(Priority priority, String user, + String queueName, ApplicationId applicationId) throws IOException { + try { + readLock.lock(); + + // verify whether 'priority' is present in cluster + if (!clusterPriorityCollections.contains(priority)) { + throw new IOException("Priority =" + priority.getPriority() + + " is not present in cluster level"); + } + + Queue queue = queueCollections.get(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + // Verify whether submitted priority is lesser than max priority + // in the queue. + if (priority.getPriority() > queue.applicationPriorityPerQueue + .getMaxApplicationPriority()) { + throw new IOException("Invalid priority as Queue: " + queueName + + " cannot support more than priority '" + + queue.applicationPriorityPerQueue.getMaxApplicationPriority() + + "'"); + } + + LOG.info("Submitted priority '" + priority.getPriority() + + "' is acceptable in queue :" + queueName + "for application:" + + applicationId); + } finally { + readLock.unlock(); + } + } + + public Integer getDefaultApplicationPriorityFromQueue(String queueName) { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + return DEFAULT_PRIORITY; + } + return currentQueue.applicationPriorityPerQueue + .getDefaultApplicationPriority(); + } finally { + readLock.unlock(); + } + } + + public RMContext getRMContext() { + return this.rmContext; + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + /** + * Add cluster level priority to store + * + * @param priorityToAdd + * new application priority + */ + public void addToClusterApplicationPriorities(Set priorityToAdd) + throws IOException { + if (null == priorityToAdd || priorityToAdd.isEmpty()) { + return; + } + + try { + writeLock.lock(); + Set priorityList = normalizeApplicationPriorities(priorityToAdd); + + for (Priority priority : priorityList) { + this.clusterPriorityCollections.add(priority); + } + + LOG.info("Add to cluster priority: [" + + StringUtils.join(priorityToAdd.iterator(), ",") + "]"); + } finally { + writeLock.unlock(); + } + } + + protected void verifyAppPrioritiesToRemove(Collection prToRemove) + throws IOException { + if (null == prToRemove || prToRemove.isEmpty()) { + return; + } + + // Check if priority to remove doesn't exist or null/empty, will throw + // exception + for (Integer priority : prToRemove) { + if (priority == null) { + throw new IOException("Priority to be removed is null"); + } + + if (!isPriorityExistsInCluster(priority)) { + throw new IOException("Priority =" + priority + + " to be removed doesn't exist in cluster " + + "priority collection."); + } + + // check if any queue contains this label + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + ApplicationPriorityPerQueue queueLabels = entry.getValue().applicationPriorityPerQueue; + if (queueLabels.getDefaultApplicationPriority().equals(priority) + || queueLabels.getMaxApplicationPriority().equals(priority)) { + throw new IOException("Cannot remove priority label=" + priority + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + protected void internalRemoveFromClusterAppPriorityList( + Collection prToRemove) { + + // remove labels from queue labels collection + for (Integer priority : prToRemove) { + clusterPriorityCollections.remove(Priority.newInstance(priority)); + } + + LOG.info("Remove priority: [" + + StringUtils.join(prToRemove.iterator(), ",") + "]"); + } + + /** + * Remove from cluster priority list + * + * @param priorityToRemove + * app priority to remove + * @throws IOException + */ + public void removeFromClusterApplicationPriority( + Collection priorityToRemove) throws IOException { + try { + writeLock.lock(); + verifyAppPrioritiesToRemove(priorityToRemove); + internalRemoveFromClusterAppPriorityList(priorityToRemove); + } finally { + writeLock.unlock(); + } + } + + /** + * Set multiple priorities to Queue + * + * @param queueName + * , applicationPriorityPerQueue max and default priority + */ + public void setApplicationPriorityPerQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException { + if (null == applicationPriorityPerQueue) { + return; + } + + // Default priority also need to be in valid range + if (applicationPriorityPerQueue.getDefaultApplicationPriority() > applicationPriorityPerQueue + .getMaxApplicationPriority()) { + throw new IOException("Default Priority configured should be" + + "less than or equal to Max application priority '" + + applicationPriorityPerQueue.getMaxApplicationPriority()); + } + + // Validate whether configured priorities are present in Cluster too. + if (!validatePerQueueApplicationPriorityInCluster(applicationPriorityPerQueue)) { + throw new IOException("Application Priority=" + + applicationPriorityPerQueue + " to be set to Queue = '" + queueName + + "', doesn't exist in cluster " + "priority collection."); + } + + try { + writeLock.lock(); + + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + currentQueue.applicationPriorityPerQueue = applicationPriorityPerQueue; + } else { + currentQueue = new Queue(queueName, applicationPriorityPerQueue); + this.queueCollections.put(queueName, currentQueue); + } + } finally { + writeLock.unlock(); + } + + LOG.info("Set priority labels to Queue: QueueName - " + queueName + + "Max Priority - " + + applicationPriorityPerQueue.getMaxApplicationPriority() + + ", Default Priority - " + + applicationPriorityPerQueue.getDefaultApplicationPriority()); + } + + /** + * Get existing valid labels in cluster + * + * @return existing valid labels in cluster + */ + public Set getClusterPriorities() { + try { + readLock.lock(); + Set priorities = new HashSet( + clusterPriorityCollections); + priorities.remove(Priority.newInstance(DEFAULT_PRIORITY)); + return Collections.unmodifiableSet(getPrioritiesToSave(priorities)); + } finally { + readLock.unlock(); + } + } + + /** + * Get cluster priority from Queue + * + * @param queueName + * @return PriorityLabelsPerQueue + * @throws IOException + */ + public ApplicationPriorityPerQueue getApplicationPriorityFromQueue( + String queueName) throws IOException { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + throw new IOException("Priority labels for Queue: '" + queueName + + "' doesn't exist in cluster."); + } + return currentQueue.applicationPriorityPerQueue; + } finally { + readLock.unlock(); + } + } + + /** + * Get mapping of queue to priority + * + * @return queues to priority map + */ + public Map getApplicationPrioritiesPerQueue() { + try { + readLock.lock(); + Map queueToPriorities = new HashMap(); + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Queue queue = entry.getValue(); + + queueToPriorities.put(queueName, queue.applicationPriorityPerQueue); + } + return Collections.unmodifiableMap(queueToPriorities); + } finally { + readLock.unlock(); + } + } + + private Set getPrioritiesToSave(Set priorityList) { + Set prioritySet = new HashSet(); + for (Priority priority : priorityList) { + prioritySet.add(priority.getPriority()); + } + return prioritySet; + } + + private Set normalizeApplicationPriorities( + Set applicationPriorities) { + Set priorityList = new HashSet(); + + for (Integer priority : applicationPriorities) { + if (priority < MIN_PRIORITY_VALUE || priority > MAX_PRIORITY_VALUE) { + continue; // skip invalid entries + } + priorityList.add(Priority.newInstance(priority)); + } + return priorityList; + } + + private boolean validatePerQueueApplicationPriorityInCluster( + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getDefaultApplicationPriority())) { + return false; + } + + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getMaxApplicationPriority())) { + return false; + } + + return true; + } + + /** + * Verify whether priority exists in repository + * + * @return true or false + */ + public boolean isPriorityExistsInCluster(Integer priorityValue) { + try { + readLock.lock(); + Priority priority = Priority.newInstance(priorityValue); + return this.clusterPriorityCollections.contains(priority); + } finally { + readLock.unlock(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java new file mode 100644 index 0000000..c99b084 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityPerQueue.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + + + +/** + *

PriorityLabelsPerQueue holds different types of priority labels in a queue.

+ * + *

It includes information such as: + *

    + *
  • Maximum Application Priority.
  • + *
  • Default Application Priority.
  • + *
+ *

+ * + */ +public class ApplicationPriorityPerQueue implements + Comparable { + + private Integer defaultAppPriority; + private Integer maxAppPriority; + + public static ApplicationPriorityPerQueue newInstance( + Integer maxPriorityLabel, Integer defaultPriorityLabel) { + ApplicationPriorityPerQueue perQueuePriorityLabels = new ApplicationPriorityPerQueue(); + perQueuePriorityLabels.setMaxApplicationPriority(maxPriorityLabel); + perQueuePriorityLabels.setDefaultApplicationPriority(defaultPriorityLabel); + return perQueuePriorityLabels; + } + + /** + * Get the max application priority of the queue. + * + * @return max application priority configured in the queue + */ + public Integer getMaxApplicationPriority() { + return maxAppPriority; + } + + public void setMaxApplicationPriority(Integer maxApplicationPriority) { + maxAppPriority = maxApplicationPriority; + } + + /** + * Get the default application priority of the queue. + * + * @return default application priority configured in the queue + */ + public Integer getDefaultApplicationPriority() { + return defaultAppPriority; + } + + public void setDefaultApplicationPriority(Integer defaultApplicationPriority) { + defaultAppPriority = defaultApplicationPriority; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ApplicationPriorityPerQueue other = (ApplicationPriorityPerQueue) obj; + if (!this.getDefaultApplicationPriority().equals( + other.getDefaultApplicationPriority())) + return false; + + if (!this.getMaxApplicationPriority().equals( + other.getMaxApplicationPriority())) + return false; + + return true; + } + + @Override + public String toString() { + return "Max application priority: " + this.getMaxApplicationPriority() + + ", " + "Default application priority: " + + this.getDefaultApplicationPriority(); + } + + @Override + public int hashCode() { + final int prime = 92821; + int result = 17; + result = result + * prime + + Math + .min(getMaxApplicationPriority(), getDefaultApplicationPriority()); + result = result + * prime + + Math + .max(getMaxApplicationPriority(), getDefaultApplicationPriority()); + return result; + } + + @Override + public int compareTo(ApplicationPriorityPerQueue priorityLabelsPerQueue) { + if (priorityLabelsPerQueue == null) { + return -1; + } + int defltLabelCompare = priorityLabelsPerQueue + .getDefaultApplicationPriority() - this.getDefaultApplicationPriority(); + if (defltLabelCompare == 0) { + return priorityLabelsPerQueue.getMaxApplicationPriority() + - this.getMaxApplicationPriority(); + } else { + return defltLabelCompare; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fbcaab9..cab82ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -233,6 +234,14 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, YarnApplicationState createApplicationState(); /** + * Returns the application priority + * + * @return the application priority. + */ + + Priority getApplicationPriority(); + + /** * Get RMAppMetrics of the {@link RMApp}. * * @return metrics diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737a..67e3886 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -123,7 +124,8 @@ private final Set updatedNodes = new HashSet(); private final String applicationType; private final Set applicationTags; - + private final Priority appPriority; + private final long attemptFailuresValidityInterval; private Clock systemClock; @@ -362,7 +364,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, String applicationType, Set applicationTags, - ResourceRequest amReq) { + ResourceRequest amReq, Priority priority) { this.systemClock = new SystemClock(); @@ -382,6 +384,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.applicationType = applicationType; this.applicationTags = applicationTags; this.amReq = amReq; + this.appPriority = priority; int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -872,7 +875,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (app.attempts.isEmpty()) { app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); return RMAppState.SUBMITTED; } @@ -880,7 +883,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // knows applications before AM or NM re-registers. app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, true, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); // recover attempts app.recoverAppAttempts(); @@ -908,7 +911,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, - app.submissionContext.getReservationID())); + app.submissionContext.getReservationID(), app.getApplicationPriority())); } } @@ -1356,4 +1359,9 @@ protected Credentials parseCredentials() throws IOException { } return credentials; } + + @Override + public Priority getApplicationPriority() { + return this.appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 2c788aa..ce9787c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @@ -28,16 +29,24 @@ private Queue queue; private final String user; private T currentAttempt; + private Priority priority; public SchedulerApplication(Queue queue, String user) { this.queue = queue; this.user = user; + this.priority = null; + } + + public SchedulerApplication(Queue queue, String user, Priority priority) { + this.queue = queue; + this.user = user; + this.priority = priority; } public Queue getQueue() { return queue; } - + public void setQueue(Queue queue) { this.queue = queue; } @@ -58,4 +67,11 @@ public void stop(RMAppState rmAppFinalState) { queue.getMetrics().finishApp(user, rmAppFinalState); } + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 532df05..1e5da47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -93,6 +93,7 @@ private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; + private Priority appPriority = null; protected List newlyAllocatedContainers = new ArrayList(); @@ -619,4 +620,12 @@ public synchronized void recoverContainer(RMContainer rmContainer) { // schedulingOpportunities // lastScheduledContainer } + + public Priority getApplicationPriority() { + return appPriority; + } + + public void setApplicationPriority(Priority appPriority) { + this.appPriority = appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28ce264..bcca9e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -148,6 +149,11 @@ public int compare(CSQueue q1, CSQueue q2) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (a1.getApplicationPriority() != null + && !a1.getApplicationPriority().equals(a2.getApplicationPriority())) { + return a1.getApplicationPriority().compareTo( + a2.getApplicationPriority()); + } return a1.getApplicationId().compareTo(a2.getApplicationId()); } }; @@ -668,7 +674,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, Priority priority) { if (mappings != null && mappings.size() > 0) { try { @@ -737,7 +743,7 @@ private synchronized void addApplication(ApplicationId applicationId, // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -767,6 +773,9 @@ private synchronized void addApplicationAttempt( .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); + if (application.getPriority() != null) { + attempt.setApplicationPriority(application.getPriority()); + } queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId @@ -1158,7 +1167,8 @@ public void handle(SchedulerEvent event) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getApplicatonPriority()); } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..b16f324 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -191,6 +191,15 @@ @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + + @Private + public static final String MAX_APPLICATION_PRIORITY = "max_application_priority"; + + @Private + public static final String DEFAULT_APPLICATION_PRIORITY = "default_application_priority"; + + @Private + public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = -1; @Private public static class QueueMapping { @@ -859,4 +868,17 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } + + public Integer getMaxApplicationPriorityConfPerQueue(String queue) { + Integer maxPriority = getInt(getQueuePrefix(queue) + + MAX_APPLICATION_PRIORITY, DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return maxPriority; + } + + public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { + Integer dfltPriority = getInt(getQueuePrefix(queue) + + DEFAULT_APPLICATION_PRIORITY, + DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return dfltPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3910ac8..61090e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityPerQueue; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -93,6 +94,10 @@ private int nodeLocalityDelay; + private Integer maxAppPriorityPerQueue; + private Integer dfltAppPriorityPerQueue; + + Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -208,6 +213,23 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) labelStrBuilder.append(","); } } + + maxAppPriorityPerQueue = conf + .getMaxApplicationPriorityConfPerQueue(getQueuePath()); + dfltAppPriorityPerQueue = conf + .getDefaultApplicationPriorityConfPerQueue(getQueuePath()); + + // Set the priority level configuration of a queue to ApplicationPriority + // Manager + if (maxAppPriorityPerQueue != -1 || dfltAppPriorityPerQueue != -1) { + this.scheduler + .getRMContext() + .getApplicationPriorityManager() + .setApplicationPriorityPerQueue( + this.queueName, + ApplicationPriorityPerQueue.newInstance(maxAppPriorityPerQueue, + dfltAppPriorityPerQueue)); + } LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + @@ -254,7 +276,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n"); + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + + "maxAppPriorityPerQueue = " + maxAppPriorityPerQueue + "\n" + + "dfltAppPriorityPerQueue = " + dfltAppPriorityPerQueue); } @Override @@ -1946,6 +1970,14 @@ public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + public Integer getMaxAppPriorityPerQueue() { + return maxAppPriorityPerQueue; + } + + public Integer getDefaultAppPriorityPerQueue() { + return dfltAppPriorityPerQueue; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index a54e4bf..f88b3a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; public class AppAddedSchedulerEvent extends SchedulerEvent { @@ -28,25 +29,27 @@ private final String user; private final ReservationId reservationID; private final boolean isAppRecovering; + private final Priority appPriority; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { - this(applicationId, queue, user, false, null); + this(applicationId, queue, user, false, null, null); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, ReservationId reservationID) { - this(applicationId, queue, user, false, reservationID); + String user, ReservationId reservationID, Priority appPriority) { + this(applicationId, queue, user, false, reservationID, appPriority); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, - String user, boolean isAppRecovering, ReservationId reservationID) { + String user, boolean isAppRecovering, ReservationId reservationID, Priority appPriority) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; + this.appPriority = appPriority; } public ApplicationId getApplicationId() { @@ -68,4 +71,8 @@ public boolean getIsAppRecovering() { public ReservationId getReservationID() { return reservationID; } + + public Priority getApplicatonPriority() { + return appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 06c6b32..72d2237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -283,6 +284,15 @@ public RMApp submitApp(int masterMemory, String name, String user, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, waitForAccepted); } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, String queue, + boolean waitForAccepted, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + waitForAccepted, false, priority); + } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, @@ -313,7 +323,16 @@ public RMApp submitApp(int masterMemory, String name, String user, boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0, null, true); + false, null, 0, null, true, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + false, null, 0, null, true, priority); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) @@ -322,7 +341,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval, null, true); + false, null, attemptFailuresValidityInterval, null, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -332,7 +351,7 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0, null, true); + isAppIdProvided, applicationId, 0, null, true, -1); } public RMApp submitApp(int masterMemory, @@ -341,7 +360,7 @@ public RMApp submitApp(int masterMemory, .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, logAggregationContext, true); + false, null, 0, logAggregationContext, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -351,6 +370,22 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) throws Exception { + return submitApp(masterMemory, name, user, + acls, unmanaged, queue, + maxAppAttempts, ts, appType, + waitForAccepted, keepContainers, isAppIdProvided, + applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, + Integer priority) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -373,6 +408,7 @@ public RMApp submitApp(int masterMemory, String name, String user, sub.setQueue(queue); } sub.setApplicationType(appType); + sub.setPriority(Priority.newInstance(priority)); ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); final Resource capability = Records.newRecord(Resource.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 9f54de8..5e564de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -153,7 +153,7 @@ protected void submitApplication( this.rmContext.getScheduler(), this.rmContext.getApplicationMasterService(), submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), null); + submissionContext.getApplicationTags(), null, null); this.rmContext.getRMApps().put(submissionContext.getApplicationId(), application); //Do not send RMAppEventType.START event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index dd2b3f8..678b37f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -122,6 +123,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.applicationpriority.ApplicationPriorityManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; @@ -1225,7 +1227,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, System.currentTimeMillis(), "YARN", null, BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - Resource.newInstance(1024, 1), 1)){ + Resource.newInstance(1024, 1), 1), null){ @Override public ApplicationReport createAndGetApplicationReport( String clientUserName, boolean allowAccess) { @@ -1512,4 +1514,46 @@ protected ClientRMService createClientRMService() { rpc.stopProxy(client, conf); rm.close(); } + + @Test (timeout = 30000) + @SuppressWarnings ("rawtypes") + public void testAppSubmitWithPriority() throws Exception { + YarnScheduler yarnScheduler = mockYarnScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + ApplicationPriorityManager appPrMgr = mock(ApplicationPriorityManager.class); + when(rmContext.getApplicationPriorityManager()).thenReturn(appPrMgr); + RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null, + mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) { + } + }); + ApplicationId appId1 = getApplicationId(100); + + ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, + appManager, null, null, null); + + // with name and queue + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1, + name, queue); + submitRequest1.getApplicationSubmissionContext().setApplicationType( + "matchType"); + submitRequest1.getApplicationSubmissionContext().setPriority( + Priority.newInstance(4)); + try { + rmService.submitApplication(submitRequest1); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app1 = rmContext.getRMApps().get(appId1); + Assert.assertNotNull("app doesn't exist", app1); + Assert.assertEquals("app name doesn't match", name, app1.getName()); + Assert.assertEquals("app queue doesn't match", queue, app1.getQueue()); + Assert.assertEquals("app priority doesn't match", Priority.newInstance(4), + app1.getApplicationPriority()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java new file mode 100644 index 0000000..cd752c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/ApplicationPriorityTestBase.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class ApplicationPriorityTestBase { + public static void assertMapEquals( + Map m1, + ImmutableMap m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (String k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertMapContains( + Map m1, + ImmutableMap m2) { + for (String k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Set s1 = new HashSet(c1); + Set s2 = new HashSet(c2); + Assert.assertEquals(s1, s2); + Assert.assertTrue(s1.containsAll(s2)); + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java new file mode 100644 index 0000000..94e426b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/DummyApplicationPriorityManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +public class DummyApplicationPriorityManager + extends + ApplicationPriorityManager { + + public DummyApplicationPriorityManager() { + super(); + } + + Collection lastAddedPrioritylabels = null; + Collection lastRemovedPrioritylabels = null; + + @Override + public void addToClusterApplicationPriorities(Set priorityToAdd) + throws IOException { + lastAddedPrioritylabels = priorityToAdd; + super.addToClusterApplicationPriorities(priorityToAdd); + } + + @Override + public void removeFromClusterApplicationPriority( + Collection priorityToRemove) throws IOException { + lastRemovedPrioritylabels = priorityToRemove; + super.removeFromClusterApplicationPriority(priorityToRemove); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java new file mode 100644 index 0000000..ba054e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/TestApplicationPriorityManager.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestApplicationPriorityManager + extends + ApplicationPriorityTestBase { + DummyApplicationPriorityManager mgr = null; + + @Before + public void before() { + mgr = new DummyApplicationPriorityManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test + public void testAddRemovePrioritylabel() throws Exception { + // mgr.setMaxPriorityValue(-1); + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + // Add a label w/o integer mapping, it should take incremented value from + // last updated priority integer value. + mgr.addToClusterApplicationPriorities(ImmutableSet.of(3)); + mgr.addToClusterApplicationPriorities(toSet(5, 7)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(5, 7)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 3, 5, 7))); + + // try to remove null, empty and non-existed label, should fail + for (Integer p : Arrays.asList(null, 9)) { + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(Arrays.asList(p)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove priority should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeFromClusterApplicationPriority(Arrays.asList(3)); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(3)); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(1, 5, 7))); + + mgr.removeFromClusterApplicationPriority(Arrays.asList(1, 5, 7)); + Assert.assertTrue(mgr.lastRemovedPrioritylabels.containsAll(Sets + .newHashSet(1, 5, 7))); + Assert.assertTrue(mgr.getClusterPriorities().isEmpty()); + } + + @Test + public void testAddandSetPrioritylabel() throws Exception { + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + mgr.addToClusterApplicationPriorities(toSet(2, 3)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(2, 3)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 2, 3))); + + // Set non-exiting labels to a queue + boolean caught = false; + try { + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(4, 1)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Cannot set label to a Queue " + + "when label is non-existing in Queue", caught); + + // Set some labels to a queue + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1)); + + Assert.assertTrue(mgr.getApplicationPriorityFromQueue("queueA").equals( + ApplicationPriorityPerQueue.newInstance(2, 1))); + } + + + @Test(timeout = 5000) + public void testRemovePrioritylabelAddedToQueue() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1)); + mgr.setApplicationPriorityPerQueue("queueB", + ApplicationPriorityPerQueue.newInstance(3, 1)); + + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to remove priority from cluster but " + + "label exists in a queue, should fail", caught); + + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(1, 3)); + + // Now removal should be a success + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + assertMapEquals(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(3, 1), "queueA", + ApplicationPriorityPerQueue.newInstance(1, 3))); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(2)); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index f8d92aa..0bd3813 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -190,6 +191,11 @@ public ReservationId getReservationId() { public ResourceRequest getAMResourceRequest() { return this.amReq; } + + @Override + public Priority getApplicationPriority() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index ec990f9..c2c254f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -57,6 +58,7 @@ RMAppAttempt attempt; int maxAppAttempts = 1; ResourceRequest amReq; + Priority priority = null; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -271,4 +273,9 @@ public ReservationId getReservationId() { public ResourceRequest getAMResourceRequest() { return this.amReq; } + + @Override + public Priority getApplicationPriority() { + return priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 72f1dff..83d16e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -264,7 +264,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null, null); + System.currentTimeMillis(), "YARN", null, null, null); testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), @@ -975,7 +975,7 @@ public void testRecoverApplication(ApplicationStateData appState, submissionContext.getApplicationTags(), BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1)); + submissionContext.getResource(), 1), submissionContext.getPriority()); Assert.assertEquals(RMAppState.NEW, application.getState()); RMAppEvent recoverEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 8656175..be82cdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -216,7 +216,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, null, null, null, ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, amResource, null), null, null, - 0, null, null, null); + 0, null, null, null, null); rmContext.getRMApps().put(attId.getApplicationId(), rmApp); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( attId.getApplicationId(), queue, user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c29dbfc..212f942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2674,7 +2674,7 @@ public void testNotAllowSubmitApplication() throws Exception { RMApp application = new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null, null); + System.currentTimeMillis(), "YARN", null, null, null); resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application); application.handle(new RMAppEvent(applicationId, RMAppEventType.START)); -- 1.9.4.msysgit.1