diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 2c788aa..f7a594d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @@ -28,10 +29,18 @@ private Queue queue; private final String user; private T currentAttempt; + private Priority priority; public SchedulerApplication(Queue queue, String user) { this.queue = queue; this.user = user; + this.setPriority(null); + } + + public SchedulerApplication(Queue queue, String user, Priority priority) { + this.queue = queue; + this.user = user; + this.setPriority(priority); } public Queue getQueue() { @@ -58,4 +67,11 @@ public void stop(RMAppState rmAppFinalState) { queue.getMetrics().finishApp(user, rmAppFinalState); } + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 4823390..3368b69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -95,6 +95,7 @@ private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; + private Priority appPriority = null; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); @@ -710,4 +711,12 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, public ResourceUsage getAppAttemptResourceUsage() { return this.attemptResourceUsage; } + + public Priority getApplicationPriority() { + return appPriority; + } + + public void setApplicationPriority(Priority appPriority) { + this.appPriority = appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1e1623d..ef3020e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -157,6 +158,10 @@ public int compare(CSQueue q1, CSQueue q2) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (!a1.getApplicationPriority().equals(a2.getApplicationPriority())) { + return a1.getApplicationPriority().compareTo( + a2.getApplicationPriority()); + } return a1.getApplicationId().compareTo(a2.getApplicationId()); } }; @@ -683,7 +688,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, Priority priority) { if (mappings != null && mappings.size() > 0) { try { @@ -752,7 +757,7 @@ private synchronized void addApplication(ApplicationId applicationId, // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -782,6 +787,7 @@ private synchronized void addApplicationAttempt( .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); + attempt.setApplicationPriority(application.getPriority()); queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId @@ -1285,7 +1291,8 @@ public void handle(SchedulerEvent event) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getApplicatonPriority()); } } break; @@ -1781,4 +1788,75 @@ public SchedulerHealth getSchedulerHealth() { private synchronized void setLastNodeUpdateTime(long time) { this.lastNodeUpdateTime = time; } + + @Override + public void authenticateApplicationPriority(Priority priority, String user, + String queueName, ApplicationId applicationId) throws IOException { + Queue queue = getQueue(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + // TODO: Verify whether submitted priority is present in cluster level. + // This check will be done in ApplicationPriorityManager + + // Verify whether submitted priority is lesser than max priority + // in the queue. + if (priority.getPriority() > queue.getMaxApplicationPriority() + .getPriority()) { + throw new IOException("Invalid priority as Queue: " + queueName + + " cannot support more than priority '" + + queue.getMaxApplicationPriority() + "'"); + } + + LOG.debug("Priority '" + priority.getPriority() + + "' is acceptable in queue :" + queueName + "for application:" + + applicationId); + } + + @Override + public synchronized void updateApplicationPriority(Priority newPriority, + String user, String queueName, ApplicationId applicationId) + throws IOException { + SchedulerApplication application = applications + .get(applicationId); + + if(application.getPriority() == newPriority){ + return; + } + + Queue queue = getQueue(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + // Verify whether submitted priority is lesser than max priority + // in the queue. + if (newPriority.getPriority() > queue.getMaxApplicationPriority() + .getPriority()) { + throw new IOException("Invalid priority as Queue: " + queueName + + " cannot support more than priority '" + + queue.getMaxApplicationPriority() + "'"); + } + + // Update new priority in Submission Context to keep track in HA + RMApp rmApp = rmContext.getRMApps().get(applicationId); + rmApp.getApplicationSubmissionContext().setPriority(newPriority); + + synchronized (application) { + application.setPriority(newPriority); + application.getCurrentAppAttempt().setApplicationPriority(newPriority); + } + } + + @Override + public Priority getDefaultApplicationPriorityFromQueue(String queueName) + throws IOException { + Queue queue = getQueue(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + return queue.getDefaultApplicationPriority(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4e8d617..96a1a23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -191,6 +191,15 @@ @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + + @Private + public static final String MAX_APPLICATION_PRIORITY = "max_application_priority"; + + @Private + public static final String DEFAULT_APPLICATION_PRIORITY = "default_application_priority"; + + @Private + public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0; @Private public static class QueueMapping { @@ -864,4 +873,17 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } + + public Integer getMaxApplicationPriorityConfPerQueue(String queue) { + Integer maxPriority = getInt(getQueuePrefix(queue) + + MAX_APPLICATION_PRIORITY, DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return maxPriority; + } + + public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { + Integer dfltPriority = getInt(getQueuePrefix(queue) + + DEFAULT_APPLICATION_PRIORITY, + DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return dfltPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f860574..49e71fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -93,6 +93,9 @@ private int nodeLocalityDelay; + private Priority maxAppPriorityPerQueue; + private Priority dfltAppPriorityPerQueue; + Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -208,6 +211,12 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) labelStrBuilder.append(","); } } + + maxAppPriorityPerQueue = Priority.newInstance(conf + .getMaxApplicationPriorityConfPerQueue(getQueuePath())); + dfltAppPriorityPerQueue = Priority.newInstance(conf + .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + @@ -254,7 +263,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n"); + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + + "maxAppPriorityPerQueue = " + maxAppPriorityPerQueue + "\n" + + "dfltAppPriorityPerQueue = " + dfltAppPriorityPerQueue); } @Override @@ -2017,4 +2028,14 @@ public Resource getClusterResource() { return clusterResource; } } + + @Override + public Priority getDefaultApplicationPriority() { + return dfltAppPriorityPerQueue; + } + + @Override + public Priority getMaxApplicationPriority() { + return maxAppPriorityPerQueue; + } } -- 1.9.4.msysgit.1