diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 2c788aa..f7a594d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @@ -28,10 +29,18 @@ private Queue queue; private final String user; private T currentAttempt; + private Priority priority; public SchedulerApplication(Queue queue, String user) { this.queue = queue; this.user = user; + this.setPriority(null); + } + + public SchedulerApplication(Queue queue, String user, Priority priority) { + this.queue = queue; + this.user = user; + this.setPriority(priority); } public Queue getQueue() { @@ -58,4 +67,11 @@ public void stop(RMAppState rmAppFinalState) { queue.getMetrics().finishApp(user, rmAppFinalState); } + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index bf5641d..8bc3aa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -92,6 +92,7 @@ private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; + private Priority appPriority = null; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); @@ -665,4 +666,12 @@ public void recordContainerAllocationTime(long value) { } } } + + public Priority getApplicationPriority() { + return appPriority; + } + + public void setApplicationPriority(Priority appPriority) { + this.appPriority = appPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..c814790 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -148,6 +149,11 @@ public int compare(CSQueue q1, CSQueue q2) { new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + if (a1.getApplicationPriority() != null + && !a1.getApplicationPriority().equals(a2.getApplicationPriority())) { + return a1.getApplicationPriority().compareTo( + a2.getApplicationPriority()); + } return a1.getApplicationId().compareTo(a2.getApplicationId()); } }; @@ -668,7 +674,7 @@ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { + String queueName, String user, boolean isAppRecovering, Priority priority) { if (mappings != null && mappings.size() > 0) { try { @@ -737,7 +743,7 @@ private synchronized void addApplication(ApplicationId applicationId, // update the metrics queue.getMetrics().submitApp(user); SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, priority); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -767,6 +773,9 @@ private synchronized void addApplicationAttempt( .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); + if (application.getPriority() != null) { + attempt.setApplicationPriority(application.getPriority()); + } queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId @@ -1168,7 +1177,8 @@ public void handle(SchedulerEvent event) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getIsAppRecovering(), + appAddedEvent.getApplicatonPriority()); } } break; @@ -1648,4 +1658,29 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return ret; } + + @Override + public void authenticateApplicationPriority(Priority priority, String user, + String queueName, ApplicationId applicationId) throws IOException { + Queue queue = getQueue(queueName); + if (null == queue) { + throw new IOException("Invalid queue name " + queueName); + } + + // Verify whether submitted priority is present in cluster level. + // This check will be done ApplicationPriorityManager + + // Verify whether submitted priority is lesser than max priority + // in the queue. + if (priority.getPriority() > queue.getMaxApplicationPriority() + .getPriority()) { + throw new IOException("Invalid priority as Queue: " + queueName + + " cannot support more than priority '" + + queue.getMaxApplicationPriority() + "'"); + } + + LOG.info("Submitted priority '" + priority.getPriority() + + "' is acceptable in queue :" + queueName + "for application:" + + applicationId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 102e553..e950417 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -193,6 +193,15 @@ public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; @Private + public static final String MAX_APPLICATION_PRIORITY = "max_application_priority"; + + @Private + public static final String DEFAULT_APPLICATION_PRIORITY = "default_application_priority"; + + @Private + public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = -1; + + @Private public static class QueueMapping { public enum MappingType { @@ -859,4 +868,17 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } + + public Integer getMaxApplicationPriorityConfPerQueue(String queue) { + Integer maxPriority = getInt(getQueuePrefix(queue) + + MAX_APPLICATION_PRIORITY, DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return maxPriority; + } + + public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { + Integer dfltPriority = getInt(getQueuePrefix(queue) + + DEFAULT_APPLICATION_PRIORITY, + DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + return dfltPriority; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3e5405d..2270b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -92,6 +92,9 @@ private float maxAMResourcePerQueuePercent; private int nodeLocalityDelay; + + private Priority maxAppPriorityPerQueue; + private Priority dfltAppPriorityPerQueue; Set activeApplications; Map applicationAttemptMap = @@ -208,6 +211,11 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) labelStrBuilder.append(","); } } + + maxAppPriorityPerQueue = Priority.newInstance(conf + .getMaxApplicationPriorityConfPerQueue(getQueuePath())); + dfltAppPriorityPerQueue = Priority.newInstance(conf + .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + @@ -254,7 +262,9 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n"); + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + + "maxAppPriorityPerQueue = " + maxAppPriorityPerQueue + "\n" + + "dfltAppPriorityPerQueue = " + dfltAppPriorityPerQueue); } @Override @@ -1956,4 +1966,14 @@ public Resource getClusterResource() { return clusterResource; } } + + @Override + public Priority getDefaultApplicationPriority() { + return dfltAppPriorityPerQueue; + } + + @Override + public Priority getMaxApplicationPriority() { + return maxAppPriorityPerQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 06c6b32..d38dd16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -283,6 +284,15 @@ public RMApp submitApp(int masterMemory, String name, String user, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, waitForAccepted); } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, String queue, + boolean waitForAccepted, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + waitForAccepted, false, priority); + } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, @@ -313,7 +323,16 @@ public RMApp submitApp(int masterMemory, String name, String user, boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0, null, true); + false, null, 0, null, true, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, Integer priority) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + false, null, 0, null, true, priority); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) @@ -322,7 +341,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval, null, true); + false, null, attemptFailuresValidityInterval, null, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -332,7 +351,7 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0, null, true); + isAppIdProvided, applicationId, 0, null, true, -1); } public RMApp submitApp(int masterMemory, @@ -341,7 +360,7 @@ public RMApp submitApp(int masterMemory, .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, logAggregationContext, true); + false, null, 0, logAggregationContext, true, -1); } public RMApp submitApp(int masterMemory, String name, String user, @@ -351,6 +370,22 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) throws Exception { + return submitApp(masterMemory, name, user, + acls, unmanaged, queue, + maxAppAttempts, ts, appType, + waitForAccepted, keepContainers, isAppIdProvided, + applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, -1); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, + Integer priority) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -373,6 +408,7 @@ public RMApp submitApp(int masterMemory, String name, String user, sub.setQueue(queue); } sub.setApplicationType(appType); + sub.setPriority(Priority.newInstance(priority)); ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); final Resource capability = Records.newRecord(Resource.class); -- 1.9.4.msysgit.1