diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java index bb9bca2..4b419b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java @@ -186,9 +186,12 @@ public abstract void setReservationRequests( * indicates a higher priority reservation. Recurring reservations are * always higher priority than non-recurring reservations. Priority for * non-recurring reservations are only compared with non-recurring - * reservations. Likewise for recurring reservations. + * reservations. Likewise for recurring reservations. If the priority is not + * provided, or a negative value is provided in the + * {@link ReservationDefinition} then the lowest possible priority will be + * used. * - * @return int representing the priority of the reserved resource + * @return {@link Priority} representing the priority of the reserved resource * allocation in the scheduler */ @Public @@ -200,7 +203,10 @@ public abstract void setReservationRequests( * indicates a higher priority reservation. Recurring reservations are * always higher priority than non-recurring reservations. Priority for * non-recurring reservations are only compared with non-recurring - * reservations. Likewise for recurring reservations. + * reservations. Likewise for recurring reservations. If the priority is not + * provided, or a negative value is provided in the + * {@link ReservationDefinition} then the lowest possible priority will be + * used. * * @param priority representing the priority of the reserved resource * allocation in the scheduler diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java new file mode 100644 index 0000000..b1496e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationPriorityScope.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Enumeration of various scopes that reservation priority applies to. + */ +@Public +@Evolving +public enum ReservationPriorityScope { + /** + * Reservation priority is applied in the scope of the queue. This means + * that the priority of a reservation will have no effect on reservations + * in different queues. + */ + QUEUE, + + /** + * Reservation priority is applied in the scope of the user. This means + * that the priority of a reservation will only be taken into consideration + * for reservations belonging to the same user. + */ + USER + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 8769ca1..2f7dcdc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -34,10 +34,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.PriorityReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager; import org.apache.hadoop.yarn.util.Clock; @@ -409,9 +411,15 @@ protected Plan initializePlan(String planQueueName) throws YarnException { Resource maxAllocation = getMaxAllocation(); ResourceCalculator rescCalc = getResourceCalculator(); Resource totCap = getPlanQueueCapacity(planQueueName); + ReservationAgent agent = getAgent(planQueuePath); + if (conf.getBoolean(CapacitySchedulerConfiguration + .ENABLE_RESERVATION_PRIORITY, CapacitySchedulerConfiguration + .DEFAULT_ENABLE_RESERVATION_PRIORITY)) { + agent = getPriorityReservationAgent(planQueueName, agent); + } Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, + agent, totCap, planStepSize, rescCalc, minAllocation, maxAllocation, planQueueName, getReplanner(planQueuePath), getReservationSchedulerConfiguration() .getMoveOnExpiry(planQueuePath), rmContext); @@ -449,19 +457,33 @@ protected ReservationAgent getAgent(String queueName) { String agentClassName = reservationConfig.getReservationAgent(queueName); LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); try { - Class agentClazz = conf.getClassByName(agentClassName); - if (ReservationAgent.class.isAssignableFrom(agentClazz)) { - return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + agentClassName - + " not instance of " + ReservationAgent.class.getCanonicalName()); - } + return (ReservationAgent) getObject(ReservationAgent.class, + agentClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Agent: " + agentClassName + " for queue: " + queueName, e); } } + protected ReservationAgent getPriorityReservationAgent(String queueName, + ReservationAgent workerAgent) { + ReservationSchedulerConfiguration reservationConfig = + getReservationSchedulerConfiguration(); + String agentClassName = reservationConfig.getPriorityReservationAgent( + queueName); + LOG.info("Using Priority Reservation Agent: " + agentClassName + " for " + + "queue: " + queueName); + try { + PriorityReservationAgent agent = (PriorityReservationAgent) getObject( + PriorityReservationAgent.class, agentClassName); + agent.setAgent(workerAgent); + return agent; + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate Priority Agent: " + + agentClassName + " for queue: " + queueName, e); + } + } + protected SharingPolicy getAdmissionPolicy(String queueName) { ReservationSchedulerConfiguration reservationConfig = getReservationSchedulerConfiguration(); @@ -470,21 +492,25 @@ protected SharingPolicy getAdmissionPolicy(String queueName) { LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName); try { - Class admissionPolicyClazz = - conf.getClassByName(admissionPolicyClassName); - if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); - } else { - throw new YarnRuntimeException("Class: " + admissionPolicyClassName - + " not instance of " + SharingPolicy.class.getCanonicalName()); - } + return (SharingPolicy) getObject(SharingPolicy.class, + admissionPolicyClassName); } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + admissionPolicyClassName + " for queue: " + queueName, e); } } + protected Object getObject(Class baseClazz, String className) + throws ClassNotFoundException { + Class clazz = conf.getClassByName(className); + if (baseClazz.isAssignableFrom(clazz)) { + return ReflectionUtils.newInstance(clazz, conf); + } else { + throw new YarnRuntimeException("Class: " + className + " not instance of " + + baseClazz.getCanonicalName()); + } + } + public ReservationsACLsManager getReservationsACLsManager() { return this.reservationsACLsManager; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 740b88c..cfa98d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -40,6 +40,11 @@ "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy"; @InterfaceAudience.Private + public static final String DEFAULT_PRIORITY_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning" + + ".SimplePriorityReservationAgent"; + + @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_PLANNER_NAME = "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner"; @@ -133,6 +138,16 @@ public String getReservationAgent(String queue) { } /** + * Gets the name of the {@code PriorityReservationAgent} class associated with + * the queue. + * @param queue name of the queue + * @return the class name of the {@code PriorityReservationAgent} + */ + public String getPriorityReservationAgent(String queue) { + return DEFAULT_PRIORITY_RESERVATION_AGENT_NAME; + } + + /** * Checks whether the reservation queues be hidden or visible * @param queuePath name of the queue * @return true if reservation queues should be visible diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java new file mode 100644 index 0000000..fe4e12f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PriorityReservationAgent.java @@ -0,0 +1,219 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This {@link ReservationAgent} is an abstract agent that wraps other + * ReservationAgents to make them priority aware. + * + * {@link PriorityReservationAgent} will attempt to interact with the plan using + * the inner {@link ReservationAgent}. If this fails, it will attempt to make + * room for the reservation based on the method defined in the + * PriorityReservationAgent subclass. + */ +public abstract class PriorityReservationAgent + implements ReservationAgent, Configurable { + + private ReservationAgent agent; + + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + private static final Log LOG = + LogFactory.getLog(PriorityReservationAgent.class.getName()); + + /** + * Make room for an incoming reservation by attempting to remove other + * reservations in the queue. + * + * @param reservationId the identifier of the reservation to be make roomd + * for. + * @param user the user who the reservation belongs to + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * reservation + * + * @return an ordered list of {@link ReservationAllocation} that were removed + * to fit the incoming reservation. The order of the list determines + * the order of reservations that will be recreated. + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public abstract List makeRoomForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) throws PlanningException; + + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + writeLock.lock(); + try { + agent.createReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to make room for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + + // Normalize priority to ensure that ReservationDefinitions with invalid + // priority have the lowest possible priority. + contract.setPriority(normalizePriority(contract.getPriority())); + List yieldedReservations = + makeRoomForReservation(reservationId, user, plan, contract); + + try { + boolean success = + agent.createReservation(reservationId, user, plan, contract); + addYieldedReservations(yieldedReservations, plan, reservationId); + return success; + } catch (PlanningException e) { + // Reset the plan back to its original state. + addYieldedReservations(yieldedReservations, plan, reservationId, true); + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + writeLock.unlock(); + } + } + + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + writeLock.lock(); + try { + agent.updateReservation(reservationId, user, plan, contract); + return true; + } catch (PlanningException e) { + LOG.info("Encountered planning exception for reservation=[" + + reservationId.toString() + "] in plan=[" + plan.getQueueName() + "]" + + " when creating the reservation. Attempt to make room for " + + "reservation by removing lower priority reservations. Exception=[" + + e.getMessage() + "]"); + } + + // Normalize priority to ensure that ReservationDefinitions with invalid + // priority have the lowest possible priority. + contract.setPriority(normalizePriority(contract.getPriority())); + List yieldedReservations = + makeRoomForReservation(reservationId, user, plan, contract); + + try { + boolean success = + agent.updateReservation(reservationId, user, plan, contract); + addYieldedReservations(yieldedReservations, plan, reservationId); + return success; + } catch (PlanningException e) { + addYieldedReservations(yieldedReservations, plan, reservationId, true); + LOG.info("Reservation=[" + reservationId + "] could not be added even " + + "after removing lower priority reservations. Attempt to re-add the " + + "removed reservations."); + throw e; + } finally { + writeLock.unlock(); + } + } + + private void addYieldedReservations(List reservations, + Plan plan, ReservationId reservationId) { + addYieldedReservations(reservations, plan, reservationId, false); + } + + private void addYieldedReservations(List reservations, + Plan plan, ReservationId reservationId, boolean addInOrderOfAcceptance) { + if (addInOrderOfAcceptance) { + reservations.sort(new ArrivalTimeComparator()); + } + + for (ReservationAllocation reservation : reservations) { + try { + agent.createReservation(reservation.getReservationId(), + reservation.getUser(), plan, + reservation.getReservationDefinition()); + } catch (PlanningException e) { + LOG.info("Reservation=[" + reservation.getReservationId() + "] was " + + "removed to make room for a higher priority reservation=[" + + reservationId + "]."); + } + } + } + + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + writeLock.lock(); + try { + return agent.deleteReservation(reservationId, user, plan); + } finally { + writeLock.unlock(); + } + } + + public ReservationAgent getAgent() { + return agent; + } + + public void setAgent(ReservationAgent newAgent) { + agent = newAgent; + } + + private Priority normalizePriority(Priority priority) { + // Undefined priority is -1, but if for some user error, the priority + // is less than 0, turn it into maxInt. + if (priority == null || priority.getPriority() < 0) { + return Priority.newInstance(Integer.MAX_VALUE); + } + return priority; + } + + private static class ArrivalTimeComparator + implements Comparator, Serializable { + public int compare(ReservationAllocation reservationA, + ReservationAllocation reservationB) { + ReservationDefinition definitionA = + reservationA == null ? null : reservationA.getReservationDefinition(); + ReservationDefinition definitionB = + reservationB == null ? null : reservationB.getReservationDefinition(); + + if (definitionA == null || definitionB == null) { + return definitionA == definitionB ? 0 : (definitionA == null ? -1 : 1); + } + + return (int) (definitionA.getArrival() - definitionB.getArrival()); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java new file mode 100644 index 0000000..019ba6b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimplePriorityReservationAgent.java @@ -0,0 +1,151 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; + +/** + * {@link SimplePriorityReservationAgent} employs the strategy of wrapping a + * configured reservation agent to leverage the algorithm for reservation + * submission, deletion, and update. + * + * If the reservation update, or submission fails, + * {@link SimplePriorityReservationAgent} will delete all reservations in the + * configured {@link ReservationPriorityScope} that are strictly lower priority + * than the offending reservation before trying again. After the offending + * reservation action has either succeeded or failed, the + * {@link SimplePriorityReservationAgent} will attempt to re-add all the + * reservations that have been deleted. + */ +public class SimplePriorityReservationAgent extends PriorityReservationAgent { + + private static final Logger LOG = + LoggerFactory.getLogger(SimplePriorityReservationAgent.class); + + private ReservationPriorityScope scope; + private Configuration configuration; + + public SimplePriorityReservationAgent(Configuration conf) { + setConf(conf); + } + + public List makeRoomForReservation( + ReservationId reservationId, String user, Plan plan, + ReservationDefinition contract) { + + Set reservations; + switch (scope) { + case USER: + // Get all reservations belonging to user; + reservations = plan.getReservations(null, null, user); + break; + case QUEUE: + default: + // Get all reservations in the queue. + reservations = plan.getAllReservations(); + break; + } + + List yieldedReservations = new ArrayList<>(); + + for (ReservationAllocation reservation : reservations) { + if (contract.getPriority().getPriority() < reservation + .getReservationDefinition().getPriority().getPriority()) { + boolean success; + + // Try to delete the reservation. If fail, then do not add it to the + // yielded list, or else it will be re-added as per the + // PriorityReservationAgent contract. InMemoryPlan currently only + // throws unchecked exceptions, which will only be thrown in the + // event that the reservation with the provided id doesn't exist. + try { + success = plan.deleteReservation(reservation.getReservationId()); + } catch (Exception e) { + LOG.error("Encountered exception when deleting reservation with " + + "id=[" + reservation.getReservationId().toString() + "] for " + + "yielding to a higher priority reservation with id=[" + + reservationId.toString() + "] due to exception=[" + + e.getMessage() + "]."); + success = false; + } + if (success) { + yieldedReservations.add(reservation); + } + } + } + + yieldedReservations.sort(new ReservationPriorityComparator()); + return yieldedReservations; + } + + public Configuration getConf() { + return configuration; + } + + public void setConf(Configuration conf) { + configuration = conf; + reinitialize(conf); + } + + private void reinitialize(Configuration conf) { + scope = + conf.getEnum(CapacitySchedulerConfiguration.RESERVATION_PRIORITY_SCOPE, + CapacitySchedulerConfiguration.DEFAULT_RESERVATION_PRIORITY_SCOPE); + } + + private static class ReservationPriorityComparator + implements Comparator, Serializable { + + public int compare(ReservationAllocation reservationA, + ReservationAllocation reservationB) { + ReservationDefinition definitionA = + reservationA == null ? null : reservationA.getReservationDefinition(); + ReservationDefinition definitionB = + reservationB == null ? null : reservationB.getReservationDefinition(); + + if (definitionA == null || definitionB == null) { + return definitionA == definitionB ? 0 : (definitionA == null ? -1 : 1); + } + + int priorityA = definitionA.getPriority().getPriority(); + int priorityB = definitionB.getPriority().getPriority(); + + if (priorityA == priorityB) { + return (int) (definitionA.getArrival() - definitionB.getArrival()); + } + + return priorityA - priorityB; + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 43ec390..153dc93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -268,6 +269,20 @@ public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = "show-reservations-as-queues"; + @Private public static final String ENABLE_RESERVATION_PRIORITY = + "enable-reservation-priority"; + @Private public static final boolean DEFAULT_ENABLE_RESERVATION_PRIORITY = + false; + + @Private public static final String RESERVATION_PRIORITY_SCOPE = + "reservation-priority-scope"; + @Private public static final ReservationPriorityScope + DEFAULT_RESERVATION_PRIORITY_SCOPE = ReservationPriorityScope.QUEUE; + + @Private + public static final String PRIORITY_RESERVATION_AGENT_NAME = + "priority-reservation-agent"; + @Private public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; @@ -1019,6 +1034,20 @@ public void setReservationAgent(String queue, String reservationPolicy) { } @Override + public String getPriorityReservationAgent(String queue) { + String priorityReservationAgent = + get(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + DEFAULT_PRIORITY_RESERVATION_AGENT_NAME); + return priorityReservationAgent; + } + + public void setPriorityReservationAgent(String queue, + String reservationPolicy) { + set(getQueuePrefix(queue) + PRIORITY_RESERVATION_AGENT_NAME, + reservationPolicy); + } + + @Override public boolean getShowReservationAsQueues(String queuePath) { boolean showReservationAsQueues = getBoolean(getQueuePrefix(queuePath) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index bd0602b..745a5ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -2187,7 +2187,8 @@ private ReservationSubmissionRequest createReservationSubmissionRequest( ReservationRequests reqs = ReservationRequests.newInstance(list, resInt); ReservationDefinition rDef = ReservationDefinition.newInstance(resInfo.getArrival(), - resInfo.getDeadline(), reqs, resInfo.getReservationName()); + resInfo.getDeadline(), reqs, resInfo.getReservationName(), "0", + Priority.newInstance(resInfo.getPriority())); ReservationId reservationId = ReservationId.parseReservationId(resContext .getReservationId()); @@ -2301,7 +2302,8 @@ private ReservationUpdateRequest createReservationUpdateRequest( ReservationRequests reqs = ReservationRequests.newInstance(list, resInt); ReservationDefinition rDef = ReservationDefinition.newInstance(resInfo.getArrival(), - resInfo.getDeadline(), reqs, resInfo.getReservationName()); + resInfo.getDeadline(), reqs, resInfo.getReservationName(), "0", + Priority.newInstance(resInfo.getPriority())); ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(rDef, ReservationId .parseReservationId(resContext.getReservationId())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ReservationDefinitionInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ReservationDefinitionInfo.java index 42a07af..4259ff5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ReservationDefinitionInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ReservationDefinitionInfo.java @@ -44,7 +44,7 @@ @XmlElement(name = "reservation-name") private String reservationName; - @XmlElement(name = "priority") + @XmlElement(name = "priority", defaultValue = "-1") private int priority; public ReservationDefinitionInfo() { @@ -57,6 +57,7 @@ public ReservationDefinitionInfo(ReservationDefinition definition) { reservationName = definition.getReservationName(); reservationRequests = new ReservationRequestsInfo(definition .getReservationRequests()); + priority = definition.getPriority().getPriority(); } public long getArrival() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java index 03bc889..1ecb9c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java @@ -389,8 +389,8 @@ private ReservationDefinition makeSimpleReservationDefinition() { String reservationName = UUID.randomUUID().toString(); return ReservationDefinition.newInstance - (arrival, arrival + (int)(defaultDuration * 1.1), defaultRequests, - reservationName); + (arrival, arrival + (int) (defaultDuration * 1.1), defaultRequests, + reservationName); } private ReservationListResponse listReservationById(String lister, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 1ff6a1a..149fa14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -178,11 +178,13 @@ public static FairScheduler setupFairScheduler(RMContext rmContext, public static ReservationDefinition createSimpleReservationDefinition( long arrival, long deadline, long duration) { - return createSimpleReservationDefinition(arrival, deadline, duration, 1); + return createSimpleReservationDefinition(arrival, deadline, duration, 1, + 0); } public static ReservationDefinition createSimpleReservationDefinition( - long arrival, long deadline, long duration, int parallelism) { + long arrival, long deadline, long duration, int parallelism, + int priority) { // create a request with a single atomic ask ReservationRequest r = ReservationRequest.newInstance(Resource.newInstance(1024, 1), @@ -194,6 +196,7 @@ public static ReservationDefinition createSimpleReservationDefinition( rDef.setReservationRequests(reqs); rDef.setArrival(arrival); rDef.setDeadline(deadline); + rDef.setPriority(Priority.newInstance(priority)); return rDef; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java new file mode 100644 index 0000000..268b05f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimplePriorityReservationAgent.java @@ -0,0 +1,798 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationPriorityScope; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * {@link TestSimplePriorityReservationAgent} verifies the logic of the + * {@link SimplePriorityReservationAgent}. + */ +public class TestSimplePriorityReservationAgent { + + private final String defaultUser = "u1"; + private ReservationSchedulerConfiguration conf; + private PriorityReservationAgent agent; + private InMemoryPlan plan; + private Resource minAlloc = Resource.newInstance(1024, 1); + private ResourceCalculator res = new DefaultResourceCalculator(); + private Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + private Random rand = new Random(); + private int numContainers = 100; + private long step; + + @Before + public void setup() throws Exception { + + float instConstraint = 100; + float avgConstraint = 100; + long timeWindow = 1000000L; + long seed = rand.nextLong(); + rand.setSeed(seed); + step = 1000L; + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); + Resource clusterCapacity = + Resource.newInstance(numContainers * 1024, numContainers); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); + + conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + ReservationAgent workerAgent = new GreedyReservationAgent(conf); + agent = new SimplePriorityReservationAgent(conf); + agent.setAgent(workerAgent); + + plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true, context); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitSimple() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + + // Reservation submission should work with reservation priority enabled. + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithSamePriorityNoFitQueue() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers / 2); + + // Verify that only the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithSamePriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers / 2); + + // Verify that only the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithHigherPriorityNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + + // Verify that the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + + ReservationId reservation3 = submitReservation(1, numContainers / 2, "u4"); + + // Verify that the reservations with the highest priority succeed. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitWithHigherPriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2, "u2"); + + // Verify that the first two reservations succeed. + assertReservationsInPlan(reservation1, reservation2); + + ReservationId reservation3 = submitReservation(1, numContainers / 2, "u2"); + + // Verify that the reservations with the highest priority for each user + // succeeds. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + + ReservationId reservation4 = submitReservation(1, numContainers / 2); + + // Verify that the reservations with the highest priority for each user + // succeeds. + assertReservationsInPlan(reservation3, reservation4); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitMultipleLowerPriorityExistsNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers); + + assertReservationsInPlan(reservation3); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitMultipleLowerPriorityExistsNoFitUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(1, numContainers, "u2"); + + // Ensure priority has no effect on reservations for different user if + // the scope is set to USER. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + + ReservationId reservation4 = submitReservation(1, numContainers); + + // Since reservation4 is made by the same user that submitted the + // existing reservations, and is higher priority, other reservations + // should be yielded. + assertReservationsInPlan(reservation4); + assertReservationsNotInPlan(reservation1, reservation2, reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrderQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(2, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation2 starts earlier than reservation1, and has the same + // priority. This means that reservation1 should be the first to be + // yielded for reservation3. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + + ReservationId reservation4 = submitReservation(1, numContainers / 2, + 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation4 yields reservation2, because it is higher priority, even + // if it is submitted with a different user. + assertReservationsInPlan(reservation3, reservation4); + assertReservationsNotInPlan(reservation1, reservation2); + + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityThenArrivalOrderUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation3 will yield reservation1 because it starts earlier + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + + ReservationId reservation4 = submitReservation(2, numContainers / 2, + 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation2 starts earlier than reservation1, and has the same + // priority. This means that reservation1 should be the first to be + // yielded for reservation3. Reservation4 belongs to a different user, so + // it cannot yield other reservations. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1, reservation4); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInArrivalOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, + (int) (0.4 * numContainers), 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = submitReservation(2, + (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation(1, + (int) (0.5 * numContainers), 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation2 is deleted despite starting earlier because deleting + // reservation1 would not have caused reservation3 to fit. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitRemovalInPriorityOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, + (int) (0.4 * numContainers), 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = submitReservation(2, + (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation(1, + (int) (0.5 * numContainers), 2 * step, 6 * step, 4 * step, "u2"); + + // Reservation2 is deleted despite having a higher priority because deleting + // reservation1 would not have caused reservation3 to fit. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(2, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation2 is deleted despite starting the earliest, because it has + // lowest priority. + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitOnlyLowerPriorityReservationsGetRemovedQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation1 is deleted because it is lowest priority. + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitOnlyLowerPriorityReservationsGetRemovedUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = + submitReservation(3, numContainers / 2, step, 7 * step, 4 * step, "u2"); + ReservationId reservation2 = + submitReservation(1, numContainers / 2, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 2 * step, 6 * step, 4 * step); + + // Reservation3 is rejected despite there existing a reservation with + // lower priority because it belongs to a different user. + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testSubmitFailureWithLowerPriorityNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + ReservationId reservation3 = submitReservation(3, numContainers / 2); + + assertReservationsInPlan(reservation1, reservation2); + assertReservationsNotInPlan(reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testReservationReAddInArrivalOrderIfNoFitAfterAccommodation() + throws PlanningException { + // If the PriorityReservationAgent fails in adding the new reservation + // after yielding, the yielded reservations will be added in order of + // arrival time. + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(1, numContainers / 2, step, 10 * step, 9 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, 2 * step, 4 * step, 2 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 3 * step, 10 * step, 6 * step); + ReservationId reservation4 = + submitReservation(1, numContainers, step, 10 * step, 9 * step); + + assertReservationsInPlan(reservation1, reservation2, reservation3); + assertReservationsNotInPlan(reservation4); + } + + @SuppressWarnings("javadoc") + @Test + public void testReservationReAddInPriorityOrderIfFitAfterAccommodation() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + // If the PriorityReservationAgent succeeds in adding the new reservation + // after yielding, the yielded reservations will be added in priority + // then arrival order. + ReservationId reservation1 = + submitReservation(1, numContainers / 3, step, 10 * step, 9 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 3, 2 * step, 4 * step, 2 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 3, 3 * step, 10 * step, 6 * step); + ReservationId reservation4 = + submitReservation(1, numContainers / 3, step, 10 * step, 9 * step); + + assertReservationsInPlan(reservation1, reservation3, reservation4); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSimple() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation1, numContainers); + + assertTrue("Reservation update should succeed if it can fit.", result); + assertReservationsInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithSamePriorityNoFit() throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertFalse("Reservation update should fail if the new contract will not " + + "fit and there are no other reservations that have a lower " + + "priority.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithHigherPriorityNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(3, numContainers / 2, "u2"); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertTrue("Reservation update should succeed because the reservation is " + + "highest priority.", result); + assertReservationsInPlan(reservation2); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateWithHigherPriorityNoFitUser() throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + ReservationId reservation1 = submitReservation(3, numContainers / 2, "u2"); + ReservationId reservation2 = submitReservation(1, numContainers / 2); + boolean result = updateReservation(reservation2, numContainers); + + assertFalse( + "Reservation update should not succeed because the " + + "increasing the reservation size will cause it not to fit.", + result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateMultipleLowerPriorityExistsNoFitQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = submitReservation(3, containers); + ReservationId reservation2 = submitReservation(2, containers, "u2"); + ReservationId reservation3 = submitReservation(1, containers); + + boolean result = updateReservation(reservation3, numContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation3); + assertReservationsNotInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateMultipleLowerPriorityExistsNoFitUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = submitReservation(2, containers); + ReservationId reservation2 = submitReservation(3, containers, "u2"); + ReservationId reservation3 = submitReservation(1, containers); + + boolean result = updateReservation(reservation3, numContainers); + + assertFalse( + "Reservation update should not succeed because not " + + "enough reservations can be yielded to accept the update.", + result); + assertReservationsInPlan(reservation1, reservation2, reservation3); + + result = updateReservation(reservation3, containers * 2); + + assertTrue("Reservation update should succeed because a lower priority " + + "reservation exist.", result); + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderQueue() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = + submitReservation(2, containers, step, 5 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + int newContainers = (int) (0.5 * numContainers); + boolean result = updateReservation(reservation3, newContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderUser() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.USER); + int containers = (int) (0.3 * numContainers); + ReservationId reservation1 = + submitReservation(2, containers, step, 5 * step, 4 * step); + ReservationId reservation2 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step, "u2"); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + int newContainers = (int) (0.5 * numContainers); + boolean result = updateReservation(reservation3, newContainers); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation2, reservation3); + assertReservationsNotInPlan(reservation1); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateRemovalInPriorityThenArrivalOrderUnlessNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, + (int) (0.2 * numContainers), 3 * step, 7 * step, 4 * step); + // This should get removed despite the earlier arrival time because it + // cannot fit with the higher priority reservation. + ReservationId reservation2 = submitReservation(2, + (int) (0.6 * numContainers), step, 5 * step, 4 * step); + ReservationId reservation3 = submitReservation(1, + (int) (0.2 * numContainers), 2 * step, 6 * step, 4 * step); + + boolean result = + updateReservation(reservation3, (int) (0.7 * numContainers)); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateLowerPriorityRemovedRegardlessOfStartTime() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + + // Lower priority than reservation 3, but later start time than the + // second reservation. + ReservationId reservation1 = + submitReservation(2, containers, 3 * step, 7 * step, 4 * step); + // Lowest priority, but earliest startTime. + ReservationId reservation2 = + submitReservation(3, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(1, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateOnlyLowerPriorityReservationsGetRemoved() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + int containers = (int) (0.3 * numContainers); + + ReservationId reservation1 = + submitReservation(1, containers, 3 * step, 7 * step, 4 * step); + ReservationId reservation2 = + submitReservation(3, containers, step, 5 * step, 4 * step); + ReservationId reservation3 = + submitReservation(2, containers, 2 * step, 6 * step, 4 * step); + + boolean result = updateReservation(reservation3, numContainers / 2); + + assertTrue("Reservation update should succeed because lower priority " + + "reservations exist.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateSuccessWithHigherPriorityFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(2, numContainers / 2); + ReservationId reservation2 = submitReservation(1, numContainers / 3); + + boolean result = updateReservation(reservation2, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation should succeed if it can fit.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testUpdateFailureWithLowerPriorityNoFit() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = submitReservation(1, numContainers / 2); + ReservationId reservation2 = submitReservation(2, numContainers / 2); + + boolean result = updateReservation(reservation2, numContainers); + + // validate results, we expect the second one to be accepted + assertFalse("Reservation should fail if it cannot fit and there are no " + + "other reservations that have a lower priority.", result); + assertReservationsInPlan(reservation1, reservation2); + } + + @SuppressWarnings("javadoc") + @Test + public void testReservationReUpdateInArrivalOrderIfNoFitAfterAccommodation() + throws PlanningException { + // If the PriorityReservationAgent fails in adding the new reservation + // after yielding, the yielded reservations will be added in order of + // arrival time. + setPriorityScope(ReservationPriorityScope.QUEUE); + ReservationId reservation1 = + submitReservation(1, numContainers / 2, step, 10 * step, 9 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, 2 * step, 4 * step, 2 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 3 * step, 10 * step, 6 * step); + + // This update will fail, even with lower priority reservations removed. + boolean result = updateReservation(reservation1, 2 * numContainers); + + // validate results, we expect the second one to be accepted + assertFalse("Reservation update should fail if it cannot fit.", result); + assertReservationsInPlan(reservation1, reservation2, reservation3); + } + + @SuppressWarnings("javadoc") + @Test + public void testReservationReUpdateInPriorityOrderIfFitAfterAccommodation() + throws PlanningException { + setPriorityScope(ReservationPriorityScope.QUEUE); + // If the PriorityReservationAgent succeeds in adding the new reservation + // after yielding, the yielded reservations will be added in priority + // then arrival order. + ReservationId reservation1 = + submitReservation(1, numContainers / 4, step, 10 * step, 9 * step); + ReservationId reservation2 = + submitReservation(3, numContainers / 2, 2 * step, 4 * step, 2 * step); + ReservationId reservation3 = + submitReservation(2, numContainers / 2, 3 * step, 10 * step, 6 * step); + + // This update will fail, even with lower priority reservations removed. + boolean result = updateReservation(reservation1, numContainers / 2); + + // validate results, we expect the second one to be accepted + assertTrue("Reservation update should succeed if it can fit.", result); + assertReservationsInPlan(reservation1, reservation3); + assertReservationsNotInPlan(reservation2); + } + + private void setPriorityScope(ReservationPriorityScope scope) { + conf.setEnum(CapacitySchedulerConfiguration.RESERVATION_PRIORITY_SCOPE, + scope); + agent.setConf(conf); + } + + private void assertReservationsNotInPlan(ReservationId... reservationIds) { + for (ReservationId reservationId : reservationIds) { + assertTrue("Reservation ID " + reservationId + " shouldn't exist in plan", + plan.getReservationById(reservationId) == null); + } + } + + private void assertReservationsInPlan(ReservationId... reservationIds) { + for (ReservationId reservationId : reservationIds) { + assertTrue("Reservation ID " + reservationId + " should exist in plan", + plan.getReservationById(reservationId) != null); + } + } + + private ReservationId submitReservation(int priority, int containers) { + return submitReservation(priority, containers, step, 2 * step, step, + defaultUser); + } + + private ReservationId submitReservation(int priority, int containers, + String user) { + return submitReservation(priority, containers, step, 2 * step, step, user); + } + + private ReservationId submitReservation(int priority, int containers, + long arrival, long deadline, long duration) { + return submitReservation(priority, containers, arrival, deadline, duration, + defaultUser); + } + + private ReservationId submitReservation(int priority, int containers, + long arrival, long deadline, long duration, String user) { + // create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(arrival); + rr.setDeadline(deadline); + rr.setPriority(Priority.newInstance(priority)); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), containers, 1, duration); + + List list = new ArrayList<>(); + list.add(r); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + try { + // submit to agent + agent.createReservation(reservationID, user, plan, rr); + } catch (PlanningException p) { + p.printStackTrace(); + } + return reservationID; + } + + private boolean updateReservation(ReservationId reservationId, + int containers) { + // Create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition oldContract = + plan.getReservationById(reservationId).getReservationDefinition(); + ReservationRequests requests = new ReservationRequestsPBImpl(); + requests.setInterpreter(ReservationRequestInterpreter.R_ALL); + // This should always succeed because reservations that are created in this + // test only have one reservation request. + ReservationRequest r = + oldContract.getReservationRequests().getReservationResources().get(0); + r.setNumContainers(containers); + + List list = new ArrayList<>(); + list.add(r); + requests.setReservationResources(list); + oldContract.setReservationRequests(requests); + + try { + return agent.updateReservation(reservationId, "u1", plan, oldContract); + } catch (PlanningException p) { + return false; + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md index 48dea5a..9e4a717 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md @@ -3344,7 +3344,7 @@ The Cluster Reservation API can be used to list reservations. When listing reser | deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. | | reservation-name | string | A mnemonic name of the reservation (not a valid identifier). | | reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration | -| priority | int | An integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. | +| priority | int | A positive integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. If a priority is not provided or if a negative priority is provided, the reservation will be set with the lowest possible priority. | ### Elements of the *reservation-requests* object @@ -3608,7 +3608,7 @@ Elements of the *reservation-definition* object | deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. | | reservation-name | string | A mnemonic name of the reservation (not a valid identifier). | | reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration | -| priority | int | An integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. | +| priority | int | A positive integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. If a priority is not provided or if a negative priority is provided, the reservation will be set with the lowest possible priority. | Elements of the *reservation-requests* object @@ -3784,7 +3784,7 @@ Elements of the *reservation-definition* object | deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. | | reservation-name | string | A mnemonic name of the reservation (not a valid identifier). | | reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration | -| priority | int | An integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. | +| priority | int | A positive integer representing the priority of the reservation. A lower number for priority indicates a higher priority reservation. Recurring reservations are always higher priority than non-recurring reservations. Priority for non-recurring reservations are only compared with non-recurring reservations. Likewise with recurring reservations. If a priority is not provided or if a negative priority is provided, the reservation will be set with the lowest possible priority. | Elements of the *reservation-requests* object