diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java index bb9bca2..00b943c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java @@ -188,7 +188,7 @@ public abstract void setReservationRequests( * non-recurring reservations are only compared with non-recurring * reservations. Likewise for recurring reservations. * - * @return int representing the priority of the reserved resource + * @return {@link Priority} representing the priority of the reserved resource * allocation in the scheduler */ @Public diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java new file mode 100644 index 0000000..b1496e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Enumeration of various scopes that reservation priority applies to. + */ +@Public +@Evolving +public enum ReservationPriorityScope { + /** + * Reservation priority is applied in the scope of the queue. This means + * that the priority of a reservation will have no effect on reservations + * in different queues. + */ + QUEUE, + + /** + * Reservation priority is applied in the scope of the user. This means + * that the priority of a reservation will only be taken into consideration + * for reservations belonging to the same user. + */ + USER + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 19966ad..acf26b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; @@ -1257,9 +1256,7 @@ private ReservationSubmissionRequest submitReservationTestHelper( ReservationId reservationID = client.createReservation().getReservationId(); ReservationSubmissionRequest sRequest = createSimpleReservationRequest( reservationID, 4, arrival, deadline, duration); - ReservationSubmissionResponse sResponse = - client.submitReservation(sRequest); - Assert.assertNotNull(sResponse); + Assert.assertNotNull(client.submitReservation(sRequest)); Assert.assertNotNull(reservationID); System.out.println("Submit reservation response: " + reservationID); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 8769ca1..2f7dcdc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,10 +34,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.PriorityReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager; import org.apache.hadoop.yarn.util.Clock; @@ -409,9 +411,15 @@ protected Plan initializePlan(String planQueueName) throws YarnException { Resource maxAllocation = getMaxAllocation(); ResourceCalculator rescCalc = getResourceCalculator(); Resource totCap = getPlanQueueCapacity(planQueueName); + ReservationAgent agent = getAgent(planQueuePath); + if (conf.getBoolean(CapacitySchedulerConfiguration + .ENABLE_RESERVATION_PRIORITY, CapacitySchedulerConfiguration + .DEFAULT_ENABLE_RESERVATION_PRIORITY)) { + agent = getPriorityReservationAgent(planQueueName, agent); + } Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, + agent, totCap, planStepSize, rescCalc, minAllocation, maxAllocation, planQueueName, getReplanner(planQueuePath), getReservationSchedulerConfiguration() .getMoveOnExpiry(planQueuePath), rmContext); @@ -449,19 +457,33 @@ protected ReservationAgent getAgent(String queueName) { String agentClassName = reservationConfig.getReservationAgent(queueName); LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); try { - Class agentClazz = conf.getClassByName(agentClassName); - if (ReservationAgent.class.isAssignableFrom(agentClazz)) { - return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + agentClassName - + " not instance of " + ReservationAgent.class.getCanonicalName()); - } + return (ReservationAgent) getObject(ReservationAgent.class, + agentClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Agent: " + agentClassName + " for queue: " + queueName, e); } } + protected ReservationAgent getPriorityReservationAgent(String queueName, + ReservationAgent workerAgent) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getPriorityReservationAgent( + queueName); + LOG.info("Using Priority Reservation Agent: " + agentClassName + " for " + + "queue: " + queueName); + try { + PriorityReservationAgent agent = (PriorityReservationAgent) getObject( + PriorityReservationAgent.class, agentClassName); + agent.setAgent(workerAgent); + return agent; + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Priority Agent: " + + agentClassName + " for queue: " + queueName, e); + } + } + protected SharingPolicy getAdmissionPolicy(String queueName) { ReservationSchedulerConfiguration reservationConfig = getReservationSchedulerConfiguration(); @@ -470,21 +492,25 @@ protected SharingPolicy getAdmissionPolicy(String queueName) { LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName); try { - Class admissionPolicyClazz = - conf.getClassByName(admissionPolicyClassName); - if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + admissionPolicyClassName - + " not instance of " + SharingPolicy.class.getCanonicalName()); - } + return (SharingPolicy) getObject(SharingPolicy.class, + admissionPolicyClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName, e); } } + protected Object getObject(Class baseClazz, String className) + throws ClassNotFoundException { + Class clazz = conf.getClassByName(className); + if (baseClazz.isAssignableFrom(clazz)) { + return ReflectionUtils.newInstance(clazz, conf); + } else { + throw new YarnRuntimeException("Class: " + className + " not instance of " + + baseClazz.getCanonicalName()); + } + } + public ReservationsACLsManager getReservationsACLsManager() { return this.reservationsACLsManager; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 740b88c..cfa98d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -40,6 +40,11 @@ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy"; @InterfaceAudience.Private + public static final String DEFAULT_PRIORITY_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning" + + ".SimplePriorityReservationAgent"; + + @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_PLANNER_NAME = "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner"; @@ -133,6 +138,16 @@ public String getReservationAgent(String queue) { } /** + * Gets the name of the {@code PriorityReservationAgent} class associated with + * the queue. + * @param queue name of the queue + * @return the class name of the {@code PriorityReservationAgent} + */ + public String getPriorityReservationAgent(String queue) { + return DEFAULT_PRIORITY_RESERVATION_AGENT_NAME; + } + + /** * Checks whether the reservation queues be hidden or visible * @param queuePath name of the queue * @return true if reservation queues should be visible diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java new file mode 100644 index 0000000..ca64276 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java @@ -0,0 +1,150 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +import java.util.List; + +/** + * This {@link ReservationAgent} is an abstract agent that wraps other + * ReservationAgents to make them priority aware. + * + * {@link PriorityReservationAgent} will attempt to interact with the plan + * using the inner {@link ReservationAgent}. If this fails, it will attempt to + * accommodate for the reservation based on the method defined in the + * PriorityReservationAgent subclass. + */ +public abstract class PriorityReservationAgent implements ReservationAgent, + Configurable { + + private ReservationAgent agent; + + private static final Log LOG = + LogFactory.getLog(PriorityReservationAgent.class.getName()); + + /** + * Accommodate for an incoming reservation by attempting to remove other + * reservations in the queue. + * + * @param reservationId the identifier of the reservation to be accommodated + * for. + * @param user the user who the reservation belongs to + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * reservation + * + * @return an ordered list of {@link ReservationAllocation} that were removed + * in order to fit the incoming reservation. + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public abstract List accommodateForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) throws PlanningException; + + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + try { + agent.createReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to accommodate for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + List yieldedReservations = + accommodateForReservation(reservationId, user, plan, contract); + + try { + return agent.createReservation(reservationId, user, plan, contract); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + addYieldedReservations(yieldedReservations, plan, reservationId); + } + } + + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + try { + agent.updateReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to accommodate for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + List yieldedReservations = + accommodateForReservation(reservationId, user, plan, contract); + + try { + return agent.updateReservation(reservationId, user, plan, contract); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + addYieldedReservations(yieldedReservations, plan, reservationId); + } + } + + private void addYieldedReservations(List reservations, + Plan plan, ReservationId reservationId) { + for (ReservationAllocation reservation : reservations) { + try { + agent.createReservation(reservation.getReservationId(), + reservation.getUser(), plan, + reservation.getReservationDefinition()); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservation.getReservationId() + "] was " + + "removed to make room for a higher priority reservation=[" + + reservationId + "]."); + } + } + } + + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + return agent.deleteReservation(reservationId, user, plan); + } + + public ReservationAgent getAgent() { + return agent; + } + + public void setAgent(ReservationAgent newAgent) { + agent = newAgent; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java new file mode 100644 index 0000000..996ea89 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java @@ -0,0 +1,130 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +/** + * {@link SimplePriorityReservationAgent} employs the strategy of wrapping a + * configured reservation agent to leverage the algorithm for reservation + * submission, deletion, and update. + * + * If the reservation update, or submission fails, + * {@link SimplePriorityReservationAgent} will delete all reservations in the + * configured {@link ReservationPriorityScope} that are strictly lower priority + * than the offending reservation before trying again. After the offending + * reservation action has either succeeded or failed, the + * {@link SimplePriorityReservationAgent} will attempt to re-add all the + * reservations that have been deleted. + */ +public class SimplePriorityReservationAgent extends PriorityReservationAgent { + + private ReservationPriorityScope scope; + private Configuration configuration; + + public SimplePriorityReservationAgent() { + this(new Configuration()); + } + + public SimplePriorityReservationAgent(Configuration conf) { + setConf(conf); + } + + public List accommodateForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) throws PlanningException { + + Set reservations; + switch (scope) { + case USER: + // Get all reservations belonging to user; + reservations = plan.getReservations(null, null, user); + break; + case QUEUE: + default: + // Get all reservations in the queue. + reservations = plan.getAllReservations(); + break; + } + + List yieldedReservations = new ArrayList<>(); + + for (ReservationAllocation reservation : reservations) { + if (contract.getPriority().getPriority() < + reservation.getReservationDefinition().getPriority().getPriority()) { + yieldedReservations.add(reservation); + plan.deleteReservation(reservation.getReservationId()); + } + } + + yieldedReservations.sort(new ReservationComparator()); + return yieldedReservations; + } + + public Configuration getConf() { + return configuration; + } + + public void setConf(Configuration conf) { + configuration = conf; + reinitialize(conf); + } + + private void reinitialize(Configuration conf) { + scope = conf.getEnum( + CapacitySchedulerConfiguration.RESERVATION_PRIORITY_SCOPE, + CapacitySchedulerConfiguration.DEFAULT_RESERVATION_PRIORITY_SCOPE); + } + + private static class ReservationComparator + implements Comparator, Serializable { + + public int compare(ReservationAllocation reservationA, + ReservationAllocation reservationB) { + ReservationDefinition definitionA = + reservationA.getReservationDefinition(); + ReservationDefinition definitionB = + reservationB.getReservationDefinition(); + if (definitionA.getPriority().getPriority() == definitionB.getPriority() + .getPriority()) { + return compare(definitionA.getArrival(), definitionB.getArrival()); + } + return compare(definitionA.getPriority().getPriority(), + definitionB.getPriority().getPriority()); + } + + public int compare(long a, long b) { + return a > b ? 1 : (a < b ? -1 : 0); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..b62a4d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -246,6 +247,20 @@ public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = "show-reservations-as-queues"; + @Private public static final String ENABLE_RESERVATION_PRIORITY = + "enable-reservation-priority"; + @Private public static final boolean DEFAULT_ENABLE_RESERVATION_PRIORITY = + false; + + @Private public static final String RESERVATION_PRIORITY_SCOPE = + "reservation-priority-scope"; + @Private public static final ReservationPriorityScope + DEFAULT_RESERVATION_PRIORITY_SCOPE = ReservationPriorityScope.QUEUE; + + @Private + public static final String PRIORITY_RESERVATION_AGENT_NAME = + "priority-reservation-agent"; + @Private public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; @@ -907,6 +922,20 @@ public void setReservationAgent(String queue, String reservationPolicy) { } @Override + public String getPriorityReservationAgent(String queue) { + String priorityReservationAgent = + get(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + DEFAULT_PRIORITY_RESERVATION_AGENT_NAME); + return priorityReservationAgent; + } + + public void setPriorityReservationAgent(String queue, + String reservationPolicy) { + set(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + reservationPolicy); + } + + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 99440a8..563c551 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -2144,7 +2144,8 @@ private ReservationSubmissionRequest createReservationSubmissionRequest( ReservationRequests reqs = ReservationRequests.newInstance(list, resInt); ReservationDefinition rDef = ReservationDefinition.newInstance(resInfo.getArrival(), - resInfo.getDeadline(), reqs, resInfo.getReservationName()); + resInfo.getDeadline(), reqs, resInfo.getReservationName(), "0", + Priority.newInstance(resInfo.getPriority())); ReservationId reservationId = ReservationId.parseReservationId(resContext .getReservationId()); @@ -2257,7 +2258,8 @@ private ReservationUpdateRequest createReservationUpdateRequest( ReservationRequests reqs = ReservationRequests.newInstance(list, resInt); ReservationDefinition rDef = ReservationDefinition.newInstance(resInfo.getArrival(), - resInfo.getDeadline(), reqs, resInfo.getReservationName()); + resInfo.getDeadline(), reqs, resInfo.getReservationName(), "0", + Priority.newInstance(resInfo.getPriority())); ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(rDef, ReservationId .parseReservationId(resContext.getReservationId())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java index c536d8d..d33766d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java @@ -388,9 +388,9 @@ private ReservationDefinition makeSimpleReservationDefinition() { long arrival = System.currentTimeMillis(); String reservationName = UUID.randomUUID().toString(); - return ReservationDefinition.newInstance - (arrival, arrival + (int)(defaultDuration * 1.1), defaultRequests, - reservationName); + return ReservationDefinition.newInstance(arrival, + arrival + (int) (defaultDuration * 1.1), defaultRequests, + reservationName); } private ReservationListResponse listReservationById(String lister, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 1ff6a1a..149fa14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -178,11 +178,13 @@ public static FairScheduler setupFairScheduler(RMContext rmContext, public static ReservationDefinition createSimpleReservationDefinition( long arrival, long deadline, long duration) { - return createSimpleReservationDefinition(arrival, deadline, duration, 1); + return createSimpleReservationDefinition(arrival, deadline, duration, 1, + 0); } public static ReservationDefinition createSimpleReservationDefinition( - long arrival, long deadline, long duration, int parallelism) { + long arrival, long deadline, long duration, int parallelism, + int priority) { // create a request with a single atomic ask ReservationRequest r = ReservationRequest.newInstance(Resource.newInstance(1024, 1), @@ -194,6 +196,7 @@ public static ReservationDefinition createSimpleReservationDefinition( rDef.setReservationRequests(reqs); rDef.setArrival(arrival); rDef.setDeadline(deadline); + rDef.setPriority(Priority.newInstance(priority)); return rDef; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java new file mode 100644 index 0000000..d09f966 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java @@ -0,0 +1,707 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * {@link TestSimplePriorityReservationAgent} verifies the logic of the + * {@link SimplePriorityReservationAgent}. + */ +public class TestSimplePriorityReservationAgent { + + private final String defaultUser = "u1"; + private ReservationSchedulerConfiguration conf; + private PriorityReservationAgent agent; + private InMemoryPlan plan; + private Resource minAlloc = Resource.newInstance(1024, 1); + private ResourceCalculator res = new DefaultResourceCalculator(); + private Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + private Random rand = new Random(); + private int numContainers = 100; + private long step; + + @Before + public void setup() throws Exception { + + float instConstraint = 100; + float avgConstraint = 100; + long timeWindow = 1000000L; + long seed = rand.nextLong(); + rand.setSeed(seed); + step = 1000L; + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); + Resource clusterCapacity = + Resource.newInstance(numContainers * 1024, numContainers); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); + + conf = ReservationSystemTestUtil + .createConf(reservationQ, timeWindow, instConstraint, avgConstraint); + + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + ReservationAgent workerAgent = new GreedyReservationAgent(conf); + agent = new SimplePriorityReservationAgent(conf); + agent.setAgent(workerAgent); + + plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true, context); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitSimple() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + + // Reservation submission should work with reservation priority enabled. + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithSamePriorityNoFitQueue() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers / 2); + + // Verify that only the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithSamePriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers / 2); + + // Verify that only the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithHigherPriorityNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + + // Verify that the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + + ReservationId reservation3 = submitReservation(1, numContainers / 2, "u4"); + + // Verify that the reservations with the highest priority succeed. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithHigherPriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2, "u2"); + + // Verify that the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + + ReservationId reservation3 = submitReservation(1, numContainers / 2, "u2"); + + // Verify that the reservations with the highest priority for each user + // succeeds. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + + ReservationId reservation4 = submitReservation(1, numContainers / 2); + + // Verify that the reservations with the highest priority for each user + // succeeds. + assertReservationsInPlan(reservation3, reservation4); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitMultipleLowerPriorityExistsNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers); + + assertReservationsInPlan(reservation3); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitMultipleLowerPriorityExistsNoFitUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers, "u2"); + + // Ensure priority has no effect on reservations for different user if + // the scope is set to USER. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + + ReservationId reservation4 = submitReservation(1, numContainers); + + // Since reservation4 is made by the same user that submitted the + // existing reservations, and is higher priority, other reservations + // should be yielded. + assertReservationsInPlan(reservation4); + assertReservationsNotInPlan(reservation1, reservation2, reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrderQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(2, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation2 starts earlier than reservation1, and has the same + // priority. This means that reservation1 should be the first to be + // yielded for reservation3. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + + ReservationId reservation4 = + submitReservation(1, numContainers / 2, 2 * step, 6 * step, 4 * step, + "u2"); + + // Reservation4 yields reservation2, because it is higher priority, even + // if it is submitted with a different user. + assertReservationsInPlan(reservation3, reservation4); + assertReservationsNotInPlan(reservation1, reservation2); + + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrderUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation3 will yield reservation1 because it starts earlier + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + + ReservationId reservation4 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step, + "u2"); + + // Reservation2 starts earlier than reservation1, and has the same + // priority. This means that reservation1 should be the first to be + // yielded for reservation3. Reservation4 belongs to a different user, so + // it cannot yield other reservations. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1, reservation4); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInArrivalOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation( + 2, (int) (0.4 * numContainers), 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = submitReservation( + 2, (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation( + 1, (int) (0.5 * numContainers), 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation2 is deleted despite starting earlier because deleting + // reservation1 would not have caused reservation3 to fit. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation( + 3, (int) (0.4 * numContainers), 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = submitReservation( + 2, (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation( + 1, (int) (0.5 * numContainers), 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation2 is deleted despite having a higher priority because deleting + // reservation1 would not have caused reservation3 to fit. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(2, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation2 is deleted despite starting the earliest, because it has + // lowest priority. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitOnlyLowerPriorityReservationsGetRemovedQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation1 is deleted because it is lowest priority. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitOnlyLowerPriorityReservationsGetRemovedUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, step, 7 * step, 4 * step, "u2"); + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation3 is rejected despite there existing a reservation with + // lower priority because it belongs to a different user. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitFailureWithLowerPriorityNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(3, numContainers / 2); + + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSimple() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation1, numContainers); + + assertTrue("Reservation update should succeed if it can fit.", result); + assertReservationsInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithSamePriorityNoFit() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertFalse("Reservation update should fail if the new contract will not " + + "fit and there are no other reservations that have a lower " + + "priority.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithHigherPriorityNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2, "u2"); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertTrue("Reservation update should succeed because the reservation is " + + "highest priority.", result); + assertReservationsInPlan(reservation2); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithHigherPriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2, "u2"); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertFalse("Reservation update should not succeed because the " + + "increasing the reservation size will cause it not to fit.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateMultipleLowerPriorityExistsNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = submitReservation(3, containers); + ReservationId reservation2 = submitReservation(2, containers, "u2"); + ReservationId reservation3 = submitReservation(1, containers); + + boolean result = updateReservation(reservation3, numContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation3); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateMultipleLowerPriorityExistsNoFitUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = submitReservation(2, containers); + ReservationId reservation2 = submitReservation(3, containers, "u2"); + ReservationId reservation3 = submitReservation(1, containers); + + boolean result = updateReservation(reservation3, numContainers); + + assertFalse("Reservation update should not succeed because not " + + "enough reservations can be yielded to accept the update.", result); + assertReservationsInPlan(reservation1, reservation2, reservation3); + + result = updateReservation(reservation3, containers * 2); + + assertTrue("Reservation update should succeed because a lower priority " + + "reservation exist.", result); + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = + submitReservation(2, containers, step, 5 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + int newContainers = (int) (0.5 * numContainers); + boolean result = updateReservation(reservation3, newContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = + submitReservation(2, containers, step, 5 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step, "u2"); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + int newContainers = (int) (0.5 * numContainers); + boolean result = updateReservation(reservation3, newContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation( + 2, (int) (0.2 * numContainers), 3 * step, 7 * step, 4 * step); + // This should get removed despite the earlier arrival time because it + // cannot fit with the higher priority reservation. + ReservationId reservation2 = submitReservation( + 2, (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation( + 1, (int) (0.2 * numContainers), 2 * step, 6 * step, 4 * step); + + boolean result = + updateReservation(reservation3, (int) (0.7 * numContainers)); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + + // Lower priority than reservation 3, but later start time than the + // second reservation. + ReservationId reservation1 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step); + // Lowest priority, but earliest startTime. + ReservationId reservation2 = + submitReservation(3, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateOnlyLowerPriorityReservationsGetRemoved() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + + ReservationId reservation1 = + submitReservation(1, containers, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSuccessWithHigherPriorityFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 3); + + boolean result = updateReservation(reservation2, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed if it can fit.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateFailureWithLowerPriorityNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + + boolean result = updateReservation(reservation2, numContainers); + + // validate results, we expect the second one to be accepted + assertFalse("Reservation should fail if it cannot fit and there are no " + + "other reservations that have a lower priority.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + private void setPriorityScope(ReservationPriorityScope scope) { + conf.setEnum(CapacitySchedulerConfiguration.RESERVATION_PRIORITY_SCOPE, + scope); + agent.setConf(conf); + } + + private void assertReservationsNotInPlan(ReservationId... reservationIds) { + for (ReservationId reservationId : reservationIds) { + assertTrue("Reservation ID " + reservationId + " shouldn't exist in plan", + plan.getReservationById(reservationId) == null); + } + } + + private void assertReservationsInPlan(ReservationId... reservationIds) { + for (ReservationId reservationId : reservationIds) { + assertTrue("Reservation ID " + reservationId + " should exist in plan", + plan.getReservationById(reservationId) != null); + } + } + + private ReservationId submitReservation(int priority, int containers) { + return submitReservation(priority, containers, step, 2 * step, step, + defaultUser); + } + + private ReservationId submitReservation(int priority, int containers, + String user) { + return submitReservation(priority, containers, step, 2 * step, step, user); + } + + private ReservationId submitReservation(int priority, int containers, + long arrival, long deadline, long duration) { + return submitReservation(priority, containers, arrival, deadline, duration, + defaultUser); + } + + private ReservationId submitReservation(int priority, int containers, + long arrival, long deadline, long duration, String user) { + // create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(arrival); + rr.setDeadline(deadline); + rr.setPriority(Priority.newInstance(priority)); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), containers, 1, duration); + + List list = new ArrayList<>(); + list.add(r); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + try { + // submit to agent + agent.createReservation(reservationID, user, plan, rr); + } catch (PlanningException p) { + p.printStackTrace(); + } + return reservationID; + } + + private boolean updateReservation(ReservationId reservationId, + int containers) { + // Create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition oldContract = + plan.getReservationById(reservationId).getReservationDefinition(); + ReservationRequests requests = new ReservationRequestsPBImpl(); + requests.setInterpreter(ReservationRequestInterpreter.R_ALL); + // This should always succeed because reservations that are created in this + // test only have one reservation request. + ReservationRequest r = + oldContract.getReservationRequests().getReservationResources().get(0); + r.setNumContainers(containers); + + List list = new ArrayList<>(); + list.add(r); + requests.setReservationResources(list); + oldContract.setReservationRequests(requests); + + try { + return agent.updateReservation(reservationId, "u1", plan, oldContract); + } catch (PlanningException p) { + return false; + } + } + +}