diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java index 4d63957..dde962c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java @@ -54,9 +54,8 @@ public static ReservationDefinition newInstance(long arrival, long deadline, @Unstable public static ReservationDefinition newInstance(long arrival, long deadline, ReservationRequests reservationRequests, String name, int priority) { - ReservationDefinition rDefinition = newInstance(arrival, deadline, - reservationRequests, name, "0", priority); - return rDefinition; + return newInstance(arrival, deadline, reservationRequests, name, "0", + priority); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 8769ca1..5ca2c33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,10 +34,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.PriorityReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager; import org.apache.hadoop.yarn.util.Clock; @@ -409,9 +411,15 @@ protected Plan initializePlan(String planQueueName) throws YarnException { Resource maxAllocation = getMaxAllocation(); ResourceCalculator rescCalc = getResourceCalculator(); Resource totCap = getPlanQueueCapacity(planQueueName); + ReservationAgent agent = getAgent(planQueuePath); + if (conf.getBoolean(CapacitySchedulerConfiguration + .ENABLE_RESERVATION_PRIORITY, CapacitySchedulerConfiguration + .DEFAULT_ENABLE_RESERVATION_PRIORITY)) { + agent = getPriorityReservationAgent(planQueueName, agent); + } Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, + agent, totCap, planStepSize, rescCalc, minAllocation, maxAllocation, planQueueName, getReplanner(planQueuePath), getReservationSchedulerConfiguration() .getMoveOnExpiry(planQueuePath), rmContext); @@ -449,19 +457,33 @@ protected ReservationAgent getAgent(String queueName) { String agentClassName = reservationConfig.getReservationAgent(queueName); LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); try { - Class agentClazz = conf.getClassByName(agentClassName); - if (ReservationAgent.class.isAssignableFrom(agentClazz)) { - return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + agentClassName - + " not instance of " + ReservationAgent.class.getCanonicalName()); - } + return (ReservationAgent) getObject(ReservationAgent.class, + agentClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Agent: " + agentClassName + " for queue: " + queueName, e); } } + protected ReservationAgent getPriorityReservationAgent(String queueName, + ReservationAgent workerAgent) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getPriorityReservationAgent( + queueName); + LOG.info("Using Priority Reservation Agent: " + agentClassName + " for " + + "queue: " + queueName); + try { + PriorityReservationAgent agent = (PriorityReservationAgent) getObject( + PriorityReservationAgent.class, agentClassName); + agent.setAgent(workerAgent); + return agent; + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Priority Agent: " + + agentClassName + " for queue: " + queueName, e); + } + } + protected SharingPolicy getAdmissionPolicy(String queueName) { ReservationSchedulerConfiguration reservationConfig = getReservationSchedulerConfiguration(); @@ -470,21 +492,25 @@ protected SharingPolicy getAdmissionPolicy(String queueName) { LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName); try { - Class admissionPolicyClazz = - conf.getClassByName(admissionPolicyClassName); - if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + admissionPolicyClassName - + " not instance of " + SharingPolicy.class.getCanonicalName()); - } + return (SharingPolicy) getObject(SharingPolicy.class, + admissionPolicyClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName, e); } } + protected Object getObject(Class baseClazz, String className) throws + ClassNotFoundException { + Class clazz = conf.getClassByName(className); + if (baseClazz.isAssignableFrom(clazz)) { + return ReflectionUtils.newInstance(clazz, conf); + } else { + throw new YarnRuntimeException("Class: " + className + + " not instance of " + baseClazz.getCanonicalName()); + } + } + public ReservationsACLsManager getReservationsACLsManager() { return this.reservationsACLsManager; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 740b88c..a15a0b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -40,6 +40,11 @@ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy"; @InterfaceAudience.Private + public static final String DEFAULT_PRIORITY_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning" + + ".SimplePriorityReservationAgent"; + + @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_PLANNER_NAME = "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner"; @@ -133,6 +138,16 @@ public String getReservationAgent(String queue) { } /** + * Gets the name of the {@code PriorityReservationAgent} class associated with + * the queue + * @param queue name of the queue + * @return the class name of the {@code PriorityReservationAgent} + */ + public String getPriorityReservationAgent(String queue) { + return DEFAULT_PRIORITY_RESERVATION_AGENT_NAME; + } + + /** * Checks whether the reservation queues be hidden or visible * @param queuePath name of the queue * @return true if reservation queues should be visible diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java new file mode 100644 index 0000000..fd9779b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java @@ -0,0 +1,139 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +import java.util.List; + +public abstract class PriorityReservationAgent implements ReservationAgent { + + private ReservationAgent agent; + + private static final Log LOG = + LogFactory.getLog(PriorityReservationAgent.class.getName()); + + /** + * Accommodate for an incoming reservation by attempting to remove other + * reservations in the queue. + * + * @param reservationId the identifier of the reservation to be accommodated + * for. + * @param user the user who the reservation belongs to + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * reservation + * + * @return an ordered list of {@link ReservationAllocation} that were removed + * in order to fit the incoming reservation. + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public abstract List accommodateForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) throws PlanningException; + + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + try { + agent.createReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to accommodate for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + List yieldedReservations = + accommodateForReservation(reservationId, user, plan, contract); + + try { + return agent.createReservation(reservationId, user, plan, contract); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + addYieldedReservations(yieldedReservations, plan, reservationId); + } + } + + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + try { + agent.updateReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to accommodate for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + List yieldedReservations = + accommodateForReservation(reservationId, user, plan, contract); + + try { + return agent.updateReservation(reservationId, user, plan, contract); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + addYieldedReservations(yieldedReservations, plan, reservationId); + } + } + + private void addYieldedReservations(List reservations, + Plan plan, ReservationId reservationId) { + for (ReservationAllocation reservation : reservations) { + try { + agent.createReservation(reservation.getReservationId(), + reservation.getUser(), plan, + reservation.getReservationDefinition()); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservation.getReservationId() + "] was " + + "removed to make room for a higher priority reservation=[" + + reservationId + "]."); + } + } + } + + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + return agent.deleteReservation(reservationId, user, plan); + } + + public ReservationAgent getAgent() { + return agent; + } + + public void setAgent(ReservationAgent newAgent) { + agent = newAgent; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java new file mode 100644 index 0000000..6bda235 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java @@ -0,0 +1,85 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +public class SimplePriorityReservationAgent extends PriorityReservationAgent { + + public SimplePriorityReservationAgent() { + } + + public List accommodateForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) throws PlanningException { + + Set reservations = plan.getAllReservations(); + List yieldedReservations = new ArrayList<>(); + + for (ReservationAllocation reservation : reservations) { + if (contract.getPriority() > reservation.getReservationDefinition() + .getPriority()) { + yieldedReservations.add(reservation); + plan.deleteReservation(reservation.getReservationId()); + } + } + + yieldedReservations.sort(new ReservationComparator()); + return yieldedReservations; + } + + private class ReservationComparator + implements Comparator { + + public int compare(ReservationAllocation reservationA, + ReservationAllocation reservationB) { + ReservationDefinition definitionA = + reservationA.getReservationDefinition(); + ReservationDefinition definitionB = + reservationB.getReservationDefinition(); + if (definitionA.getPriority() == definitionB.getPriority()) { + // The arrival is compared in opposite order of priority because + // higher number for priority indicates a reservation with a higher + // priority. Conversely, a reservation with an earlier arrival time + // will take precedence over a reservation that arrives later. + return compare(definitionA.getArrival(), definitionB.getArrival()); + } + return compare(definitionB.getPriority(), definitionA.getPriority()); + } + + public int compare(long a, long b) { + return a > b ? 1 : (a < b ? -1 : 0); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..e7ad471 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -246,6 +246,15 @@ public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = "show-reservations-as-queues"; + @Private public static final String ENABLE_RESERVATION_PRIORITY = + "enable-reservation-priority"; + @Private public static final boolean DEFAULT_ENABLE_RESERVATION_PRIORITY = + false; + + @Private + public static final String PRIORITY_RESERVATION_AGENT_NAME = + "priority-reservation-agent"; + @Private public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; @@ -907,6 +916,20 @@ public void setReservationAgent(String queue, String reservationPolicy) { } @Override + public String getPriorityReservationAgent(String queue) { + String priorityReservationAgent = + get(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + DEFAULT_PRIORITY_RESERVATION_AGENT_NAME); + return priorityReservationAgent; + } + + public void setPriorityReservationAgent(String queue, + String reservationPolicy) { + set(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + reservationPolicy); + } + + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 059f55c..3391a99 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -177,11 +177,13 @@ public static FairScheduler setupFairScheduler(RMContext rmContext, public static ReservationDefinition createSimpleReservationDefinition( long arrival, long deadline, long duration) { - return createSimpleReservationDefinition(arrival, deadline, duration, 1); + return createSimpleReservationDefinition(arrival, deadline, duration, 1, + 0); } public static ReservationDefinition createSimpleReservationDefinition( - long arrival, long deadline, long duration, int parallelism) { + long arrival, long deadline, long duration, int parallelism, + int priority) { // create a request with a single atomic ask ReservationRequest r = ReservationRequest.newInstance(Resource.newInstance(1024, 1), @@ -193,6 +195,7 @@ public static ReservationDefinition createSimpleReservationDefinition( rDef.setReservationRequests(reqs); rDef.setArrival(arrival); rDef.setDeadline(deadline); + rDef.setPriority(priority); return rDef; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityAgent.java new file mode 100644 index 0000000..fbc9332 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityAgent.java @@ -0,0 +1,540 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestSimplePriorityAgent { + + PriorityReservationAgent agent; + InMemoryPlan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + Random rand = new Random(); + int numContainers = 100; + long step; + + public TestSimplePriorityAgent() { + } + + @Before + public void setup() throws Exception { + + long seed = rand.nextLong(); + rand.setSeed(seed); + Log.info("Running with seed: " + seed); + + // setting completely loose quotas + long timeWindow = 1000000L; + Resource clusterCapacity = + Resource.newInstance(numContainers * 1024, numContainers); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); + + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = ReservationSystemTestUtil + .createConf(reservationQ, timeWindow, instConstraint, avgConstraint); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + ReservationAgent workerAgent = new GreedyReservationAgent(conf); + agent = new SimplePriorityReservationAgent(); + agent.setAgent(workerAgent); + + conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + step = 1000L; + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); + + plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true, context); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitSimple() throws PlanningException { + boolean result = submitReservation(1, numContainers / 2) != null; + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed if it can fit.", result); + assertTrue("The first reservation is expected to succeed.", + plan.getAllReservations().size() == 1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithSamePriorityNoFit() throws PlanningException { + submitReservation(1, numContainers / 2); + submitReservation(1, numContainers / 2); + boolean result = submitReservation(1, numContainers / 2) == null; + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should fail if it cannot fit and there are no " + + "other reservations that have a lower priority.", result); + assertTrue("The first two reservations are expected to succeed.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithHigerPriorityNoFit() throws PlanningException { + ReservationId reservation1 = submitReservation(1, numContainers / 2); + submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(3, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue("The lowest priority reservation should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitMultipleLowerPriorityExistsNoFit() + throws PlanningException { + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(3, numContainers); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue("The two lowest priority reservations should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("The two lowest priority reservations should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be one reservation in the plan.", + plan.getAllReservations().size() == 1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrder() + throws PlanningException { + ReservationId reservation1 = + submitReservation(1, numContainers / 2, 3 * step, 7 * step, 4 * step); + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue( + "The reservation with the lowest priority and highest arrival " + + "time should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrderUnlessNoFit() + throws PlanningException { + submitReservation(1, (int) (0.4 * numContainers), 3 * step, 7 * step, + 4 * step); + // This should get removed despite the earlier arrival time because it + // cannot fit with the higher priority reservation. + ReservationId reservation2 = submitReservation(1, + (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation(3, + (int) (0.5 * numContainers), 2 * step, 6 * step, 4 * step); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue( + "The reservation with the lowest priority and highest arrival " + + "time should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + // Lower priority than reservation 3, but later start time than the + // second reservation. + submitReservation(2, numContainers / 2, 3 * step, 7 * step, 4 * step); + + // Lowest priority, but earliest startTime. + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue( + "The reservation with the lowest priority time should be removed " + + "regardless of the startTime.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitOnlyLowerPriorityReservationsGetRemoved() + throws PlanningException { + submitReservation(4, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue("The reservation with the lowest priority should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitSuccessWithHigherPriorityFit() + throws PlanningException { + submitReservation(1, numContainers / 2); + boolean result = submitReservation(2, numContainers / 2) == null; + + // validate results, we expect the second one to be accepted + assertFalse("Reservation should succeed if it can fit.", result); + assertTrue("The first two reservations are expected to succeed.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitFailureWithLowerPriorityNoFit() + throws PlanningException { + submitReservation(2, numContainers / 2); + submitReservation(2, numContainers / 2); + boolean result = submitReservation(1, numContainers / 2) == null; + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should fail if it cannot fit and there are no " + + "other reservations that have a lower priority.", result); + assertTrue("The first two reservations are expected to succeed", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSimple() throws PlanningException { + ReservationId reservation1 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation1, numContainers); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed if it can fit.", result); + assertTrue("The first reservation is expected to succeed.", + plan.getAllReservations().size() == 1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithSamePriorityNoFit() throws PlanningException { + submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + // validate results, we expect the second one to be accepted + assertFalse( + "Reservation update should fail if the new contract will not " + + "fit and there are no other reservations that have a lower priority.", + result); + assertTrue("The first two reservations are expected to succeed.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithHigherPriorityNoFit() throws PlanningException { + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(3, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because the reservation is " + + "highest priority.", result); + assertTrue("The lowest priority reservation should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("There should only be 1 reservations in the plan.", + plan.getAllReservations().size() == 1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateMultipleLowerPriorityExistsNoFit() + throws PlanningException { + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = submitReservation(1, containers); + ReservationId reservation2 = submitReservation(2, containers); + ReservationId reservation3 = submitReservation(3, containers); + + boolean result = updateReservation(reservation3, numContainers); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertTrue("The two lowest priority reservations should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("The two lowest priority reservations should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be one reservation in the plan.", + plan.getAllReservations().size() == 1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrder() + throws PlanningException { + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = + submitReservation(1, containers, 3 * step, 7 * step, 4 * step); + submitReservation(1, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, containers, 2 * step, 6 * step, 4 * step); + + int newContainers = (int) (0.5 * numContainers); + boolean result = updateReservation(reservation3, newContainers); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertTrue( + "The reservation with the lowest priority and highest arrival " + + "time should be removed.", + plan.getReservationById(reservation1) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderUnlessNoFit() + throws PlanningException { + submitReservation(1, (int) (0.2 * numContainers), 3 * step, 7 * step, + 4 * step); + // This should get removed despite the earlier arrival time because it + // cannot fit with the higher priority reservation. + ReservationId reservation2 = submitReservation(1, + (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation(3, + (int) (0.2 * numContainers), 2 * step, 6 * step, 4 * step); + + boolean result = + updateReservation(reservation3, (int) (0.7 * numContainers)); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed because lower priority " + + "reservations exist.", reservation3 != null); + assertTrue( + "The reservation with the lowest priority and highest arrival " + + "time should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + int containers = (int) (0.3 * numContainers); + + // Lower priority than reservation 3, but later start time than the + // second reservation. + submitReservation(2, containers, 3 * step, 7 * step, 4 * step); + + // Lowest priority, but earliest startTime. + ReservationId reservation2 = + submitReservation(1, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertTrue( + "The reservation with the lowest priority time should be removed " + + "regardless of the startTime.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateOnlyLowerPriorityReservationsGetRemoved() + throws PlanningException { + int containers = (int) (0.3 * numContainers); + + submitReservation(4, containers, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(1, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(3, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertTrue("The reservation with the lowest priority should be removed.", + plan.getReservationById(reservation2) == null); + assertTrue("There should only be two reservations in the plan.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSuccessWithHigherPriorityFit() + throws PlanningException { + submitReservation(1, numContainers / 2); + ReservationId reservationId2 = submitReservation(2, numContainers / 3); + + boolean result = updateReservation(reservationId2, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed if it can fit.", result); + assertTrue("The first two reservations are expected to succeed.", + plan.getAllReservations().size() == 2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateFailureWithLowerPriorityNoFit() + throws PlanningException { + submitReservation(2, numContainers / 2); + ReservationId reservationId2 = submitReservation(1, numContainers / 2); + + boolean result = updateReservation(reservationId2, numContainers); + + // validate results, we expect the second one to be accepted + assertFalse("Reservation should fail if it cannot fit and there are no " + + "other reservations that have a lower priority.", result); + assertTrue("The first two reservations are expected to succeed", + plan.getAllReservations().size() == 2); + } + + private ReservationId submitReservation(int priority, int containers) { + return submitReservation(priority, containers, step, 2 * step, step); + } + + private ReservationId submitReservation(int priority, int containers, + long arrival, long deadline, long duration) { + // create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(arrival); + rr.setDeadline(deadline); + rr.setPriority(priority); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), containers, 1, duration); + + List list = new ArrayList<>(); + list.add(r); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + try { + // submit to agent + agent.createReservation(reservationID, "u1", plan, rr); + return reservationID; + } catch (PlanningException p) { + return null; + } + } + + private boolean updateReservation(ReservationId reservationId, + int containers) { + // Create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition oldContract = + plan.getReservationById(reservationId).getReservationDefinition(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + // This should always succeed because reservations that are created in this + // test only have one reservation request. + ReservationRequest r = + oldContract.getReservationRequests().getReservationResources().get(0); + r.setNumContainers(containers); + + List list = new ArrayList<>(); + list.add(r); + reqs.setReservationResources(list); + oldContract.setReservationRequests(reqs); + + try { + // submit to agent + return agent.updateReservation(reservationId, "u1", plan, oldContract); + } catch (PlanningException p) { + return false; + } + } + +}