diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index aea931ae914..1323bd5b495 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -98,7 +99,7 @@ public static int toYarnApplicationPriority(String priority) { case VERY_LOW : return 1; case DEFAULT : - return 0; + return Integer.parseInt(YarnConfiguration.DEFAULT_APPLICATION_PRIORITY); } throw new IllegalArgumentException("Unrecognized priority: " + priority); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 7d33ed23d92..5551f358907 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -651,15 +651,17 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( } String jobPriority = jobConf.get(MRJobConfig.PRIORITY); + int iPriority; if (jobPriority != null) { - int iPriority; try { iPriority = TypeConverter.toYarnApplicationPriority(jobPriority); } catch (IllegalArgumentException e) { iPriority = Integer.parseInt(jobPriority); } - appContext.setPriority(Priority.newInstance(iPriority)); + } else { + iPriority = Integer.parseInt(YarnConfiguration.DEFAULT_APPLICATION_PRIORITY); } + appContext.setPriority(Priority.newInstance(iPriority)); return appContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5842d643579..97299713046 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1011,6 +1011,9 @@ public static boolean isAclEnabled(Configuration conf) { /** Default queue name */ public static final String DEFAULT_QUEUE_NAME = "default"; + /** Default application priority */ + public static final String DEFAULT_APPLICATION_PRIORITY = "0"; + /** * Buckets (in minutes) for the number of apps running in each queue. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 281aded9e8c..c132f76bd11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -122,6 +122,17 @@ public FSAppAttempt(FairScheduler scheduler, this.appPriority = Priority.newInstance(1); } + public FSAppAttempt(FairScheduler scheduler, + ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, + ActiveUsersManager activeUsersManager, RMContext rmContext, Priority appPriority) { + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + this.scheduler = scheduler; + this.startTime = scheduler.getClock().getTime(); + this.lastTimeAtFairShare = this.startTime; + this.appPriority = appPriority; + } + /** * Get metrics reference from containing queue. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index eb9f6af7101..19965c0c8df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -514,7 +515,8 @@ protected void addApplication(ApplicationId applicationId, } SchedulerApplication application = - new SchedulerApplication(queue, user); + new SchedulerApplication(queue, user, + rmApp.getApplicationSubmissionContext().getPriority()); applications.put(applicationId, application); queue.getMetrics().submitApp(user); queue.addAssignedApp(applicationId); @@ -558,7 +560,7 @@ protected void addApplicationAttempt( FSLeafQueue queue = (FSLeafQueue) application.getQueue(); FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user, - queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); + queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext, application.getPriority()); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -639,6 +641,30 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { return queue; } + @Override + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { + try { + readLock.lock(); + Priority appPriority = priorityRequestedByApp; + + // verify the scenario where priority is null from submissionContext + if (null == appPriority || + Priority.newInstance(0).compareTo(appPriority) < 0) { + // Set to default priority + appPriority = Priority.newInstance(Integer.parseInt( + YarnConfiguration.DEFAULT_APPLICATION_PRIORITY)); + + LOG.info("Application '" + applicationId + "' is submitted with an illegal " + + "priority hence considering default priority: " + appPriority.getPriority()); + } + return appPriority; + } finally { + readLock.unlock(); + } + } + private void removeApplication(ApplicationId applicationId, RMAppState finalState) { SchedulerApplication application = applications.remove( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 8179aa75033..8a55cefdf70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; @@ -87,6 +88,12 @@ public String getName() { @Override public int compare(Schedulable s1, Schedulable s2) { + Priority p1 = s1.getPriority(); + Priority p2 = s2.getPriority(); + if (!p1.equals(p2)) { + return p1.compareTo(p2); + } + int res = compareDemand(s1, s2); // Pre-compute resource usages to avoid duplicate calculation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestApplicationPriority.java new file mode 100644 index 00000000000..124c7b1b8ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestApplicationPriority.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestApplicationPriority { + private final int GB = 1024; + private YarnConfiguration conf; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + } + + @Test + public void testApplicationOrderingWithPriority() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FairScheduler fs = (FairScheduler)rm.getResourceScheduler(); + QueueManager queueMgr = fs.getQueueManager(); + FSLeafQueue q = queueMgr.getLeafQueue("default", false); + Assert.assertNotNull(q); + + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(16 * GB), + 1, host); + fs.handle(new NodeAddedSchedulerEvent(node)); + + // add app 1 start + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(appId1, 1); + + RMAppAttemptMetrics attemptMetrics1 = new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetrics1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAttemptEvent1 = new AppAddedSchedulerEvent(appId1, + "default", "user", null, Priority.newInstance(5)); + fs.handle(addAttemptEvent1); + // add app1 end + + // add app2 begin + ApplicationId appId2 = BuilderUtils.newApplicationId(100,2 ); + ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(appId2, 1); + RMAppAttemptMetrics rmAppAttemptMetrics2 = new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(rmAppAttemptMetrics2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + + SchedulerEvent addAppEvent2 = new AppAddedSchedulerEvent(appId2, + "default", "user", null, Priority.newInstance(8)); + fs.handle(addAppEvent2); + // add app end + + assertEquals(q.getNumActiveApps(), 2); + + FSAppAttempt appAttempt1 = fs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + FSAppAttempt appAttempt2 = fs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + + FairSharePolicy policy = new FairSharePolicy(); + Comparator comparator = policy.getComparator(); + assertTrue(comparator.compare(appAttempt1, appAttempt2) > 0); + + rm.stop(); + } + + @Test + public void testApplicationPriorityAllocation() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // allocate 7 containers for App1 + List allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", + 7, 2 * GB, nm1); + + Assert.assertEquals(7, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); + + // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemorySize()); + Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemorySize()); + + // Submit the second app App2 with priority 8 (Higher than App1) + Priority appPriority2 = Priority.newInstance(8); + RMApp app2 = rm.submitApp(1 * GB, appPriority2); + + // kick the scheduler, 1 GB which was free is given to AM of App2 + MockAM am2 = MockRM.launchAM(app2, rm, nm1); + am2.registerAppAttempt(); + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); + + // get scheduler + FairScheduler fs = (FairScheduler) rm.getResourceScheduler(); + + // get scheduler app + FSAppAttempt schedulerAppAttempt = fs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + + // kill 2 containers of App1 to free up some space + int counter = 0; + for (Container c : allocated1) { + if (++counter > 2) { + break; + } + fs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + } + + // check node report, 12 GB used and 4 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemorySize()); + + // send updated request for App1 + am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList()); + + // kick the scheduler, since App2 priority is more than App1, it will get + // remaining cluster space. + List allocated2 = am2.allocateAndWaitForContainers("127.0.0.1", + 2, 2 * GB, nm1); + + // App2 has got 2 containers now. + Assert.assertEquals(2, allocated2.size()); + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemorySize()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); + + rm.stop(); + } +} -- 2.17.1