diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index fa0835a..8a15ac6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -204,6 +204,8 @@ private String getDefaultPlanFollower() { // currently only capacity scheduler is supported if (scheduler instanceof CapacityScheduler) { return CapacitySchedulerPlanFollower.class.getName(); + } else if (scheduler instanceof FairScheduler) { + return FairSchedulerPlanFollower.class.getName(); } return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java new file mode 100644 index 0000000..e9b28d9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower { + private static final Logger LOG = LoggerFactory + .getLogger(FairSchedulerPlanFollower.class); + + private FairScheduler fs; + + @Override + public void init(Clock clock, ResourceScheduler sched, + Collection plans) { + super.init(clock, sched, plans); + fs = (FairScheduler)sched; + LOG.info("Initializing Plan Follower Policy:" + + this.getClass().getCanonicalName()); + } + + @Override + protected Queue getPlanQueue(String planQueueName) { + Queue planQueue = fs.getQueueManager().getParentQueue(planQueueName, false); + if (planQueue == null) { + LOG.error("The queue " + planQueueName + " cannot be found or is not a " + + "ParentQueue"); + } + return planQueue; + } + + @Override + protected float calculateTargetCapacity(Resource clusterResources, + Resource planResources, Resource capToAssign) { + return Resources.divide(fs.getResourceCalculator(), + clusterResources, capToAssign, planResources); + } + + @Override + protected boolean isPlanResourcesLessThanReservations(Resource + clusterResources, Resource planResources, Resource reservedResources) { + return Resources.greaterThan(fs.getResourceCalculator(), + clusterResources, reservedResources, planResources); + } + + @Override + protected List getChildReservationQueues(Queue queue) { + FSQueue planQueue = (FSQueue)queue; + List childQueues = planQueue.getChildQueues(); + return childQueues; + } + + + @Override + protected void addReservationQueue(String planQueueName, Queue queue, + String currResId) { + String leafQueueName = getReservationQueueName(planQueueName, currResId); + fs.getQueueManager().getLeafQueue(leafQueueName, true); + } + + @Override + protected void createDefaultReservationQueue(String planQueueName, + Queue queue, String defReservationId) { + String defReservationQueueName = getReservationQueueName(planQueueName, + defReservationId); + if (!fs.getQueueManager().exists(defReservationQueueName)) { + fs.getQueueManager().getLeafQueue(defReservationQueueName, true); + } + } + + @Override + protected Resource getPlanResources(Plan plan, Queue queue, + Resource clusterResources) { + FSParentQueue planQueue = (FSParentQueue)queue; + Resource planResources = planQueue.getSteadyFairShare(); + return planResources; + } + + @Override + protected Resource getReservationQueueResourceIfExists(Plan plan, + ReservationId reservationId) { + String reservationQueueName = getReservationQueueName(plan.getQueueName() + , reservationId.toString()); + FSLeafQueue reservationQueue = fs.getQueueManager().getLeafQueue + (reservationQueueName, false); + Resource reservationResource = null; + if (reservationQueue != null) { + reservationResource = reservationQueue.getSteadyFairShare(); + } + return reservationResource; + } + + @Override + protected String getReservationQueueName(String planQueueName, + String reservationQueueName) { + String planQueueNameFullPath = fs.getQueueManager().getQueue + (planQueueName).getName(); + + if (!reservationQueueName.startsWith(planQueueNameFullPath)) { + // If name is not a path we need full path for FairScheduler. See + // YARN-2773 for the root cause + return planQueueNameFullPath + "." + reservationQueueName; + } + return reservationQueueName; + } + + @Override + protected String getReservationIdFromQueueName(String resQueueName) { + return resQueueName.substring(resQueueName.lastIndexOf(".") + 1); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index fd99d65..0ea7314 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -207,6 +207,10 @@ public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; } + + public void setQueueWeight(String queue, ResourceWeights weight) { + queueWeights.put(queue, weight); + } public int getUserMaxApps(String user) { Integer maxApps = userMaxApps.get(user); @@ -323,4 +327,14 @@ public boolean getMoveOnExpiry(String queue) { public long getEnforcementWindow(String queue) { return globalReservationQueueConfig.getEnforcementWindowMsec(); } + + @VisibleForTesting + public void setReservationWindow(long window) { + globalReservationQueueConfig.setReservationWindow(window); + } + + @VisibleForTesting + public void setAverageCapacity(int avgCapacity) { + globalReservationQueueConfig.setAverageCapacity(avgCapacity); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 91bea11..3c97535 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -516,6 +517,15 @@ public void updateStarvationStats() { } } + /** Allows setting weight for a dynamically created queue + * Currently only used for reservation based queues + * @param weight queue weight + */ + public void setWeights(float weight) { + scheduler.getAllocationConfiguration().setQueueWeight(getName(), + new ResourceWeights(weight)); + } + /** * Helper method to check if the queue should preempt containers * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 57b41af..1424cc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,6 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -1163,9 +1166,15 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; - addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + String queueName = + resolveReservationQueueName(appAddedEvent.getQueue(), + appAddedEvent.getApplicationId(), + appAddedEvent.getReservationID()); + if (queueName != null) { + addApplication(appAddedEvent.getApplicationId(), + queueName, appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); + } break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1223,6 +1232,51 @@ public void handle(SchedulerEvent event) { } } + private String resolveReservationQueueName(String queueName, + ApplicationId applicationId, ReservationId reservationID) { + FSQueue queue = queueMgr.getQueue(queueName); + if (queue == null || !(allocConf.isReservable(queue.getQueueName()))) { + return queueName; + } + // Use fully specified name from now on (including root. prefix) + queueName = queue.getQueueName(); + if (reservationID != null) { + String resQName = queueName + "." + reservationID.toString(); + queue = queueMgr.getQueue(resQName); + if (queue == null) { + String message = + "Application " + + applicationId + + " submitted to a reservation which is not yet currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return null; + } + if (!queue.getParent().getQueueName().equals(queueName)) { + String message = + "Application: " + applicationId + " submitted to a reservation " + + resQName + " which does not belong to the specified queue: " + + queueName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return null; + } + // use the reservation queue to run the app + queueName = resQName; + } else { + // use the default child queue of the plan for unreserved apps + queueName = getDefaultQueueForPlanQueue(queueName); + } + return queueName; + } + + private String getDefaultQueueForPlanQueue(String queueName) { + String planName = queueName.substring(queueName.lastIndexOf(".") + 1); + queueName = queueName + "." + planName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + return queueName; + } + @Override public void recover(RMState state) throws Exception { // NOT IMPLEMENTED @@ -1441,7 +1495,8 @@ public synchronized String moveApplication(ApplicationId appId, // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); - FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false); + String destQueueName = handleMoveToPlanQueue(queueName); + FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { throw new YarnException("Target queue " + queueName + " not found or is not a leaf queue."); @@ -1577,4 +1632,45 @@ public synchronized void updateNodeResource(RMNode nm, } return planQueues; } + + @Override + public synchronized void setEntitlement(String queueName, + QueueEntitlement entitlement) throws YarnException { + + FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false); + if (reservationQueue == null) { + throw new YarnException("Target queue " + queueName + + " not found or is not a leaf queue."); + } + + reservationQueue.setWeights(entitlement.getCapacity()); + + // TODO Does MaxCapacity need to be set for fairScheduler ? + } + + /** + * Only supports removing empty leaf queues + * @param queueName name of queue to remove + * @throws YarnException if queue to remove is either not a leaf or if its + * not empty + */ + @Override + public synchronized void removeQueue(String queueName) throws YarnException { + FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false); + if (reservationQueue != null) { + if (!queueMgr.removeLeafQueue(queueName)) { + throw new YarnException("Could not remove queue " + queueName + " as " + + "its either not a leaf queue or its not empty"); + } + } + } + + private String handleMoveToPlanQueue(String targetQueueName) { + FSQueue dest = queueMgr.getQueue(targetQueueName); + if (dest != null && allocConf.isReservable(dest.getQueueName())) { + // use the default child reservation queue of the plan + targetQueueName = getDefaultQueueForPlanQueue(targetQueueName); + } + return targetQueueName; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 0e625d7..27e571e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -91,7 +91,18 @@ public FSLeafQueue getLeafQueue(String name, boolean create) { } return (FSLeafQueue) queue; } - + + /** + * Remove a leaf queue if empty + * @param name name of the queue + * @return true if queue was removed or false otherwise + */ + public boolean removeLeafQueue(String name) { + name = ensureRootPrefix(name); + return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT); + } + + /** * Get a parent queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a parent queue, i.e. it already exists as a diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java index 747a4c2..cf7f84e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; @@ -102,4 +103,14 @@ public void setReservationAdmissionPolicy(String reservationAdmissionPolicy) { public void setReservationAgent(String reservationAgent) { this.reservationAgent = reservationAgent; } + + @VisibleForTesting + public void setReservationWindow(long reservationWindow) { + this.reservationWindow = reservationWindow; + } + + @VisibleForTesting + public void setAverageCapacity(int averageCapacity) { + this.avgOverTimeMultiplier = averageCapacity; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java index 82ba731..f294eaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -18,14 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; @@ -38,15 +35,16 @@ import java.io.File; import java.io.IOException; -import static org.mockito.Mockito.when; - -public class TestFairReservationSystem extends FairSchedulerTestBase { - private final static String ALLOC_FILE = new File(TEST_DIR, +public class TestFairReservationSystem { + private final static String ALLOC_FILE = new File(FairSchedulerTestBase. + TEST_DIR, TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + private Configuration conf; + private FairScheduler scheduler; + private FairSchedulerTestBase testHelper = new FairSchedulerTestBase(); - @Override protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); + Configuration conf = testHelper.createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); @@ -60,10 +58,6 @@ public void setup() throws IOException { @After public void teardown() { - if (resourceManager != null) { - resourceManager.stop(); - resourceManager = null; - } conf = null; } @@ -75,7 +69,8 @@ public void testFairReservationSystemInitialize() throws IOException { // Setup RMContext mockRMContext = testUtil.createRMContext(conf); - setupFairScheduler(testUtil, mockRMContext); + scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil, + mockRMContext, conf, 10); FairReservationSystem reservationSystem = new FairReservationSystem(); reservationSystem.setRMContext(mockRMContext); @@ -97,14 +92,15 @@ public void testFairReservationSystemReinitialize() throws IOException { ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); // Setup - RMContext mockContext = testUtil.createRMContext(conf); - setupFairScheduler(testUtil, mockContext); + RMContext mockRMContext = testUtil.createRMContext(conf); + scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil, + mockRMContext, conf, 10); FairReservationSystem reservationSystem = new FairReservationSystem(); - reservationSystem.setRMContext(mockContext); + reservationSystem.setRMContext(mockRMContext); try { - reservationSystem.reinitialize(scheduler.getConf(), mockContext); + reservationSystem.reinitialize(scheduler.getConf(), mockRMContext); } catch (YarnException e) { Assert.fail(e.getMessage()); } @@ -116,10 +112,10 @@ public void testFairReservationSystemReinitialize() throws IOException { // Dynamically add a plan ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE); - scheduler.reinitialize(conf, mockContext); + scheduler.reinitialize(conf, mockRMContext); try { - reservationSystem.reinitialize(conf, mockContext); + reservationSystem.reinitialize(conf, mockRMContext); } catch (YarnException e) { Assert.fail(e.getMessage()); } @@ -129,23 +125,4 @@ public void testFairReservationSystemReinitialize() throws IOException { (reservationSystem, newQueue); } - private void setupFairScheduler(ReservationSystemTestUtil testUtil, - RMContext rmContext) throws - IOException { - - scheduler = new FairScheduler(); - scheduler.setRMContext(rmContext); - - int numContainers = 10; - when(rmContext.getScheduler()).thenReturn(scheduler); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, rmContext); - - Resource resource = testUtil.calculateClusterResource(numContainers); - RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java new file mode 100644 index 0000000..d5778f4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java @@ -0,0 +1,204 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Matchers; +import org.mockito.Mockito; + +public class TestFairSchedulerPlanFollower extends + TestSchedulerPlanFollowerBase { + private final static String ALLOC_FILE = new File(FairSchedulerTestBase. + TEST_DIR, + TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + private RMContext rmContext; + private RMContext spyRMContext; + private FairScheduler fs; + private Configuration conf; + private FairSchedulerTestBase testHelper = new FairSchedulerTestBase(); + + @Rule + public TestName name = new TestName(); + + protected Configuration createConfiguration() { + Configuration conf = testHelper.createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; + } + + @Before + public void setUp() throws Exception { + conf = createConfiguration(); + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + rmContext = TestUtils.getMockRMContext(); + spyRMContext = spy(rmContext); + fs = ReservationSystemTestUtil.setupFairScheduler(testUtil, + spyRMContext, conf, 125); + scheduler = fs; + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) + .thenReturn(null); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + setupPlanFollower(); + } + + private void setupPlanFollower() throws Exception { + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + mClock = mock(Clock.class); + mAgent = mock(ReservationAgent.class); + + String reservationQ = testUtil.getFullReservationQueueName(); + AllocationConfiguration allocConf = fs.getAllocationConfiguration(); + allocConf.setReservationWindow(20L); + allocConf.setAverageCapacity(20); + policy.init(reservationQ, allocConf); + } + + @Test + public void testWithMoveOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with move + testPlanFollower(true); + } + + @Test + public void testWithKillOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with kill + testPlanFollower(false); + } + + @Override + protected void verifyCapacity(Queue defQ) { + assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > + 0.9); + } + + @Override + protected Queue getDefaultQueue() { + return getReservationQueue("dedicated" + + PlanQueue.DEFAULT_QUEUE_SUFFIX); + } + + @Override + protected int getNumberOfApplications(Queue queue) { + int numberOfApplications = fs.getAppsInQueue(queue.getQueueName()).size(); + return numberOfApplications; + } + + @Override + protected AbstractSchedulerPlanFollower createPlanFollower() { + FairSchedulerPlanFollower planFollower = + new FairSchedulerPlanFollower(); + planFollower.init(mClock, scheduler, Collections.singletonList(plan)); + return planFollower; + } + + @Override + protected void assertReservationQueueExists(ReservationId r) { + Queue q = getReservationQueue(r.toString()); + assertNotNull(q); + } + + @Override + protected void assertReservationQueueExists(ReservationId r, + double expectedCapacity, double expectedMaxCapacity) { + FSLeafQueue q = fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" + + "." + + r, false); + assertNotNull(q); + // For now we are setting both to same weight + Assert.assertEquals(expectedCapacity, q.getWeights().getWeight + (ResourceType.MEMORY), 0.01); + } + + @Override + protected void assertReservationQueueDoesNotExist(ReservationId r) { + Queue q = getReservationQueue(r.toString()); + assertNull(q); + } + + @Override + protected Queue getReservationQueue(String r) { + return fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" + + "." + + r, false); + } + + public static ApplicationACLsManager mockAppACLsManager() { + Configuration conf = new Configuration(); + return new ApplicationACLsManager(conf); + } + + @After + public void tearDown() throws Exception { + if (scheduler != null) { + fs.stop(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 7b6aaf3..8656175 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -60,7 +60,7 @@ public void tick(int seconds) { } } - protected final static String TEST_DIR = + public final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); private static RecordFactory @@ -74,7 +74,7 @@ public void tick(int seconds) { protected ResourceManager resourceManager; // Helper methods - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 903c7af..458b06d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -58,8 +58,7 @@ public void resetLastPreemptResources() { } } - @Override - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class, ResourceScheduler.class);