diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AlignedPlannerWithGreedy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AlignedPlannerWithGreedy.java new file mode 100644 index 0000000..9f347fa --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AlignedPlannerWithGreedy.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A planning algorithm that first runs LowCostAligned, and if it fails runs + * Greedy. + */ +public class AlignedPlannerWithGreedy implements ReservationAgent { + + // Default smoothness factor + private static final int DEFAULT_SMOOTHNESS_FACTOR = 10; + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(AlignedPlannerWithGreedy.class); + + // Smoothness factor + private final ReservationAgent planner; + + // Constructor + public AlignedPlannerWithGreedy() { + this(DEFAULT_SMOOTHNESS_FACTOR); + } + + // Constructor + public AlignedPlannerWithGreedy(int smoothnessFactor) { + + // List of algorithms + List listAlg = new LinkedList(); + + // LowCostAligned planning algorithm + ReservationAgent algAligned = + new IterativePlanner(new StageEarliestStartByDemand(), + new StageAllocatorLowCostAligned(smoothnessFactor)); + listAlg.add(algAligned); + + // Greedy planning algorithm + ReservationAgent algGreedy = + new IterativePlanner(new StageEarliestStartByJobArrival(), + new StageAllocatorGreedy()); + listAlg.add(algGreedy); + + // Set planner: + // 1. Attempt to execute algAligned + // 2. If failed, fall back to algGreedy + planner = new TryManyReservationAgents(listAlg); + + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("placing the following ReservationRequest: " + contract); + + try { + boolean res = + planner.createReservation(reservationId, user, plan, contract); + + if (res) { + LOG.info("OUTCOME: SUCCESS, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } else { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } + return res; + } catch (PlanningException e) { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString() + + ", Contract: " + contract.toString()); + throw e; + } + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("updating the following ReservationRequest: " + contract); + + return planner.updateReservation(reservationId, user, plan, contract); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + LOG.info("removing the following ReservationId: " + reservationId); + + return planner.deleteReservation(reservationId, user, plan); + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageAllocator.java new file mode 100644 index 0000000..8883ba5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageAllocator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Interface for allocating a single stage in IterativePlanner. + */ +public interface IStageAllocator { + + /** + * Computes the allocation of a stage inside a defined time interval. + * + * @param plan the Plan to which the reservation must be fitted + * @param planLoads a 'dirty' read of the plan loads at each time + * @param planModifications the allocations performed by the planning + * algorithm which are not yet reflected by plan + * @param rr the stage + * @param stageEarliestStart the arrival time (earliest starting time) set for + * the stage by the two phase planning algorithm + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return The computed allocation (or null if the stage could not be + * allocated) + */ + Map computeStageAllocation( + Plan plan, Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline); + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageEarliestStart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageEarliestStart.java new file mode 100644 index 0000000..eacdbac --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IStageEarliestStart.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; + +/** + * Interface for setting the earliest start time of a stage in IterativePlanner. + */ +public interface IStageEarliestStart { + + /** + * Computes the earliest allowed starting time for a given stage. + * + * @param plan the Plan to which the reservation must be fitted + * @param reservation the job contract + * @param index the index of the stage in the job contract + * @param currentReservationStage the stage + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return the earliest allowed starting time for the stage. + */ + long setEarliestStartTime(Plan plan, ReservationDefinition reservation, + int index, ReservationRequest currentReservationStage, + long stageDeadline); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IterativePlanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IterativePlanner.java new file mode 100644 index 0000000..1940e5a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/IterativePlanner.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.HashMap; +import java.util.ListIterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A planning algorithm consisting of two main phases. The algorithm iterates + * over the job stages in descending order. For each stage, the algorithm: 1. + * Determines an interval [stageArrivalTime, stageDeadline) in which the stage + * is allocated. 2. Computes an allocation for the stage inside the interval. + * + * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be + * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of + * each stage is set as succcessorStartTime - the starting time of its + * succeeding stage (or jobDeadline if it is the last stage). + * + * The phases are set using the two functions: 1. setAlgEarliestStartTime 2. + * setAlgComputeStageAllocation + */ +public class IterativePlanner extends PlanningAlgorithm { + + // Modifications performed by the algorithm that are not been reflected in the + // actual plan while a request is still pending. + private RLESparseResourceAllocation planModifications; + + // Data extracted from plan + private Map planLoads; + private Resource capacity; + private long step; + + // Job parameters + private ReservationRequestInterpreter jobType; + private long jobArrival; + private long jobDeadline; + + // Phase algorithms + private IStageEarliestStart algStageEarliestStart = null; + private IStageAllocator algStageAllocator = null; + + // Constructor + public IterativePlanner(IStageEarliestStart algEarliestStartTime, + IStageAllocator algStageAllocator) { + + setAlgStageEarliestStart(algEarliestStartTime); + setAlgStageAllocator(algStageAllocator); + + } + + @Override + public RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws ContractValidationException { + + // Initialize + initialize(plan, reservation); + + // If the job has been previously reserved, logically remove its allocation + ReservationAllocation oldReservation = + plan.getReservationById(reservationId); + if (oldReservation != null) { + ignoreOldAllocation(oldReservation); + } + + // Create the allocations data structure + // Map allocations = + // new HashMap(); + RLESparseResourceAllocation allocations = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Get a reverse iterator for the set of stages + ListIterator li = + reservation + .getReservationRequests() + .getReservationResources() + .listIterator( + reservation.getReservationRequests().getReservationResources() + .size()); + + // Current stage + ReservationRequest currentReservationStage; + + // Index, points on the current node + int index = + reservation.getReservationRequests().getReservationResources().size(); + + // Stage deadlines + long stageDeadline = stepRoundDown(reservation.getDeadline(), step); + long successorStartingTime = -1; + + // Iterate the stages in reverse order + while (li.hasPrevious()) { + + // Get current stage + currentReservationStage = li.previous(); + index -= 1; + + // Validate that the ReservationRequest respects basic constraints + validateInputStage(plan, currentReservationStage); + + // Compute an adjusted earliestStart for this resource + // (we need this to provision some space for the ORDER contracts) + long stageArrivalTime = reservation.getArrival(); + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + stageArrivalTime = + computeEarliestStartingTime(plan, reservation, index, + currentReservationStage, stageDeadline); + } + stageArrivalTime = stepRoundUp(stageArrivalTime, step); + stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); + + // Compute the allocation of a single stage + Map curAlloc = + computeStageAllocation(plan, currentReservationStage, + stageArrivalTime, stageDeadline); + + // If we did not find an allocation, return NULL + // (unless it's an ANY job, then we simply continue). + if (curAlloc == null) { + + // If it's an ANY job, we can move to the next possible request + if (jobType == ReservationRequestInterpreter.R_ANY) { + continue; + } + + // Otherwise, the job cannot be allocated + return null; + + } + + // Get the start & end time of the current allocation + Long stageStartTime = findEarliestTime(curAlloc.keySet()); + Long stageEndTime = findLatestTime(curAlloc.keySet()); + + // If we did find an allocation for the stage, add it + for (Entry entry : curAlloc + .entrySet()) { + allocations.addInterval(entry.getKey(), entry.getValue()); + } + + // If this is an ANY clause, we have finished + if (jobType == ReservationRequestInterpreter.R_ANY) { + break; + } + + // If ORDER job, set the stageDeadline of the next stage to be processed + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + + // Verify that there is no gap, in case the job is ORDER_NO_GAP + if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP + && successorStartingTime != -1 + && successorStartingTime > stageEndTime) { + + return null; + + } + + // Store the stageStartTime and set the new stageDeadline + successorStartingTime = stageStartTime; + stageDeadline = stageStartTime; + + } + + } + + // If the allocation is empty, return an error + if (allocations.isEmpty()) { + return null; + } + + return allocations; + + } + + protected void initialize(Plan plan, ReservationDefinition reservation) { + + // Get plan step & capacity + capacity = plan.getTotalCapacity(); + step = plan.getStep(); + + // Get job parameters (type, arrival time & deadline) + jobType = reservation.getReservationRequests().getInterpreter(); + jobArrival = stepRoundUp(reservation.getArrival(), step); + jobDeadline = stepRoundDown(reservation.getDeadline(), step); + + // Dirty read of plan load + planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); + + // Initialize the plan modifications + planModifications = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + } + + private Map getAllLoadsInInterval(Plan plan, long startTime, + long endTime) { + + // Create map + Map loads = new HashMap(); + + // Calculate the load for every time slot between [start,end) + for (long t = startTime; t < endTime; t += step) { + Resource load = plan.getTotalCommittedResources(t); + loads.put(t, load); + } + + // Return map + return loads; + + } + + private void ignoreOldAllocation(ReservationAllocation oldReservation) { + + // If there is no old reservation, return + if (oldReservation == null) { + return; + } + + // Subtract each allocation interval from the planModifications + for (Entry entry : oldReservation + .getAllocationRequests().entrySet()) { + + // Read the entry + ReservationInterval interval = entry.getKey(); + ReservationRequest request = entry.getValue(); + + // Find the actual request + ReservationRequest negativeRequest = + ReservationRequest.newInstance(request.getCapability(), + -1 * request.getNumContainers(), request.getConcurrency(), + request.getDuration()); + + // Insert it into planModifications as a 'negative' request, to + // represent available resources + planModifications.addInterval(interval, negativeRequest); + + } + + } + + private void validateInputStage(Plan plan, ReservationRequest rr) + throws ContractValidationException { + + // Validate concurrency + if (rr.getConcurrency() < 1) { + throw new ContractValidationException("Gang Size should be >= 1"); + } + + // Validate number of containers + if (rr.getNumContainers() <= 0) { + throw new ContractValidationException("Num containers should be > 0"); + } + + // Check that gangSize and numContainers are compatible + if (rr.getNumContainers() % rr.getConcurrency() != 0) { + throw new ContractValidationException( + "Parallelism must be an exact multiple of gang size"); + } + + // Check that the largest container request does not exceed the cluster-wide + // limit for container sizes + if (Resources.greaterThan(plan.getResourceCalculator(), capacity, + rr.getCapability(), plan.getMaximumAllocation())) { + + throw new ContractValidationException( + "Individual capability requests should not exceed cluster's " + + "maxAlloc"); + + } + + } + + // Call algEarliestStartTime() + protected long computeEarliestStartingTime(Plan plan, + ReservationDefinition reservation, int index, + ReservationRequest currentReservationStage, long stageDeadline) { + + return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, + currentReservationStage, stageDeadline); + + } + + // Call algStageAllocator + protected Map + computeStageAllocation(Plan plan, ReservationRequest rr, + long stageArrivalTime, long stageDeadline) { + + return algStageAllocator.computeStageAllocation(plan, planLoads, + planModifications, rr, stageArrivalTime, stageDeadline); + + } + + // Set the algorithm: algStageEarliestStart + public IterativePlanner setAlgStageEarliestStart(IStageEarliestStart alg) { + + this.algStageEarliestStart = alg; + return this; // To allow concatenation of setAlg() functions + + } + + // Set the algorithm: algStageAllocator + public IterativePlanner setAlgStageAllocator(IStageAllocator alg) { + + this.algStageAllocator = alg; + return this; // To allow concatenation of setAlg() functions + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanningAlgorithm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanningAlgorithm.java new file mode 100644 index 0000000..a7ce1d0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanningAlgorithm.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * An abstract class that follows the general behavior of planning algorithms. + */ +public abstract class PlanningAlgorithm implements ReservationAgent { + + /** + * Performs the actual allocation for a ReservationDefinition within a Plan. + * + * @param reservationId the identifier of the reservation + * @param user the user who owns the reservation + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources required by the user for his + * session + * @param oldReservation the existing reservation (null if none) + * @return whether the allocateUser function was successful or not + * + * @throws PlanningException if the session cannot be fitted into the plan + * @throws ContractValidationException + */ + protected boolean allocateUser(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract, + ReservationAllocation oldReservation) throws PlanningException, + ContractValidationException { + + // Adjust the ResourceDefinition to account for system "imperfections" + // (e.g., scheduling delays for large containers). + ReservationDefinition adjustedContract = adjustContract(plan, contract); + + // Compute the job allocation + RLESparseResourceAllocation allocation = + computeJobAllocation(plan, reservationId, adjustedContract); + + // If no job allocation was found, fail + if (allocation == null) { + throw new PlanningException( + "The planning algorithm could not find a valid allocation" + + " for your request"); + } + + // Translate the allocation to a map (with zero paddings) + long step = plan.getStep(); + long jobArrival = stepRoundUp(adjustedContract.getArrival(), step); + long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step); + Map mapAllocations = + allocationsToPaddedMap(allocation, jobArrival, jobDeadline); + + // Create the reservation + ReservationAllocation capReservation = + new InMemoryReservationAllocation(reservationId, // ID + adjustedContract, // Contract + user, // User name + plan.getQueueName(), // Queue name + findEarliestTime(mapAllocations.keySet()), // Earliest start time + findLatestTime(mapAllocations.keySet()), // Latest end time + mapAllocations, // Allocations + plan.getResourceCalculator(), // Resource calculator + plan.getMinimumAllocation()); // Minimum allocation + + // Add (or update) the reservation allocation + if (oldReservation != null) { + return plan.updateReservation(capReservation); + } else { + return plan.addReservation(capReservation); + } + + } + + private Map + allocationsToPaddedMap(RLESparseResourceAllocation allocation, + long jobArrival, long jobDeadline) { + + // Allocate + Map mapAllocations = + allocation.toIntervalMap(); + + // Zero allocation + ReservationRequest zeroResource = + ReservationRequest.newInstance(Resource.newInstance(0, 0), 0); + + // Pad at the beginning + long earliestStart = findEarliestTime(mapAllocations.keySet()); + if (jobArrival < earliestStart) { + mapAllocations.put(new ReservationInterval(jobArrival, earliestStart), + zeroResource); + } + + // Pad at the beginning + long latestEnd = findLatestTime(mapAllocations.keySet()); + if (latestEnd < jobDeadline) { + mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline), + zeroResource); + } + + return mapAllocations; + + } + + public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws PlanningException, ContractValidationException; + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Allocate + return allocateUser(reservationId, user, plan, contract, null); + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Get the old allocation + ReservationAllocation oldAlloc = plan.getReservationById(reservationId); + + // Allocate (ignores the old allocation) + return allocateUser(reservationId, user, plan, contract, oldAlloc); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + // Delete the existing reservation + return plan.deleteReservation(reservationId); + + } + + protected static long findEarliestTime(Set sesInt) { + + long ret = Long.MAX_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getStartTime() < ret) { + ret = s.getStartTime(); + } + } + return ret; + + } + + protected static long findLatestTime(Set sesInt) { + + long ret = Long.MIN_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getEndTime() > ret) { + ret = s.getEndTime(); + } + } + return ret; + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + private ReservationDefinition adjustContract(Plan plan, + ReservationDefinition originalContract) { + + // Place here adjustment. For example using QueueMetrics we can track + // large container delays per YARN-YARN-1990 + + return originalContract; + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 3f6f405..afd429b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -48,7 +48,7 @@ private static final int THRESHOLD = 100; private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); - private TreeMap cumulativeCapacity = + private final TreeMap cumulativeCapacity = new TreeMap(); private final ReentrantReadWriteLock readWriteLock = @@ -77,7 +77,7 @@ private boolean isSameAsNext(Long key, Resource capacity) { /** * Add a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * added * @param capacity the resource to be added @@ -87,7 +87,7 @@ public boolean addInterval(ReservationInterval reservationInterval, ReservationRequest capacity) { Resource totCap = Resources.multiply(capacity.getCapability(), - (float) capacity.getNumContainers()); + capacity.getNumContainers()); if (totCap.equals(ZERO_RESOURCE)) { return true; } @@ -144,7 +144,7 @@ public boolean addInterval(ReservationInterval reservationInterval, /** * Add multiple resources for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * added * @param ReservationRequests the resources to be added @@ -170,7 +170,7 @@ public boolean addCompositeInterval(ReservationInterval reservationInterval, /** * Removes a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * removed * @param capacity the resource to be removed @@ -180,7 +180,7 @@ public boolean removeInterval(ReservationInterval reservationInterval, ReservationRequest capacity) { Resource totCap = Resources.multiply(capacity.getCapability(), - (float) capacity.getNumContainers()); + capacity.getNumContainers()); if (totCap.equals(ZERO_RESOURCE)) { return true; } @@ -224,7 +224,7 @@ public boolean removeInterval(ReservationInterval reservationInterval, /** * Returns the capacity, i.e. total resources allocated at the specified point * of time - * + * * @param tick the time (UTC in ms) at which the capacity is requested * @return the resources allocated at the specified time */ @@ -243,7 +243,7 @@ public Resource getCapacityAtTime(long tick) { /** * Get the timestamp of the earliest resource allocation - * + * * @return the timestamp of the first resource allocation */ public long getEarliestStartTime() { @@ -261,7 +261,7 @@ public long getEarliestStartTime() { /** * Get the timestamp of the latest resource allocation - * + * * @return the timestamp of the last resource allocation */ public long getLatestEndTime() { @@ -279,7 +279,7 @@ public long getLatestEndTime() { /** * Returns true if there are no non-zero entries - * + * * @return true if there are no allocations or false otherwise */ public boolean isEmpty() { @@ -320,9 +320,49 @@ public String toString() { } /** + * Returns the representation of the current resources allocated over time as + * an interval map. + * + * @return the representation of the current resources allocated over time as + * an interval map. + */ + public Map toIntervalMap() { + + readLock.lock(); + try { + Map allocations = + new TreeMap(); + + // Empty + if (isEmpty()) { + return allocations; + } + + Map.Entry lastEntry = null; + for (Map.Entry entry : cumulativeCapacity.entrySet()) { + + if (lastEntry != null) { + ReservationInterval interval = + new ReservationInterval(lastEntry.getKey(), entry.getKey()); + ReservationRequest resource = + ReservationRequest.newInstance(lastEntry.getValue(), 1); + + allocations.put(interval, resource); + } + + lastEntry = entry; + } + return allocations; + } finally { + readLock.unlock(); + } + + } + + /** * Returns the JSON string representation of the current resources allocated * over time - * + * * @return the JSON string representation of the current resources allocated * over time */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorGreedy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorGreedy.java new file mode 100644 index 0000000..9836d3f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorGreedy.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Computes the stage allocation according to the greedy allocation rule. The + * greedy rule repeatedly allocates requested containers at the rightmost + * (latest) free interval. + */ + +public class StageAllocatorGreedy implements IStageAllocator { + + @Override + public Map computeStageAllocation( + Plan plan, Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + Resource totalCapacity = plan.getTotalCapacity(); + + Map allocationRequests = + new HashMap(); + + // compute the gang as a resource and get the duration + Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); + long dur = rr.getDuration(); + long step = plan.getStep(); + + // ceil the duration to the next multiple of the plan step + if (dur % step != 0) { + dur += (step - (dur % step)); + } + + // we know for sure that this division has no remainder (part of contract + // with user, validate before + int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); + + int maxGang = 0; + + // loop trying to place until we are done, or we are considering + // an invalid range of times + while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) { + + // as we run along we remember how many gangs we can fit, and what + // was the most constraining moment in time (we will restart just + // after that to place the next batch) + maxGang = gangsToPlace; + long minPoint = stageDeadline; + int curMaxGang = maxGang; + + // start placing at deadline (excluded due to [,) interval semantics and + // move backward + for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur + && maxGang > 0; t = t - plan.getStep()) { + + // As we run along we will logically remove the previous allocation for + // this reservation + // if one existed + // Resource oldResCap = Resource.newInstance(0, 0); + // if (oldResAllocation != null) { + // oldResCap = oldResAllocation.getResourcesAtTime(t); + // } + + // compute net available resources + Resource netAvailableRes = Resources.clone(totalCapacity); + // Resources.addTo(netAvailableRes, oldResCap); + Resources.subtractFrom(netAvailableRes, + plan.getTotalCommittedResources(t)); + Resources.subtractFrom(netAvailableRes, + planModifications.getCapacityAtTime(t)); + + // compute maximum number of gangs we could fit + curMaxGang = + (int) Math.floor(Resources.divide(plan.getResourceCalculator(), + totalCapacity, netAvailableRes, gang)); + + // pick the minimum between available resources in this instant, and how + // many gangs we have to place + curMaxGang = Math.min(gangsToPlace, curMaxGang); + + // compare with previous max, and set it. also remember *where* we found + // the minimum (useful for next attempts) + if (curMaxGang <= maxGang) { + maxGang = curMaxGang; + minPoint = t; + } + } + + // if we were able to place any gang, record this, and decrement + // gangsToPlace + if (maxGang > 0) { + gangsToPlace -= maxGang; + + ReservationInterval reservationInt = + new ReservationInterval(stageDeadline - dur, stageDeadline); + ReservationRequest reservationRes = + ReservationRequest.newInstance(rr.getCapability(), + rr.getConcurrency() * maxGang, rr.getConcurrency(), + rr.getDuration()); + // remember occupied space (plan is read-only till we find a plausible + // allocation for the entire request). This is needed since we might be + // placing other ReservationRequest within the same + // ReservationDefinition, + // and we must avoid double-counting the available resources + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.put(reservationInt, reservationRes); + + } + + // reset our new starting point (curDeadline) to the most constraining + // point so far, we will look "left" of that to find more places where + // to schedule gangs (for sure nothing on the "right" of this point can + // fit a full gang. + stageDeadline = minPoint; + } + + // if no gangs are left to place we succeed and return the allocation + if (gangsToPlace == 0) { + return allocationRequests; + } else { + // If we are here is becasue we did not manage to satisfy this request. + // So we need to remove unwanted side-effect from tempAssigned (needed + // for ANY). + for (Map.Entry tempAllocation + : allocationRequests.entrySet()) { + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + } + // and return null to signal failure in this allocation + return null; + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorLowCostAligned.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorLowCostAligned.java new file mode 100644 index 0000000..f0a0b18 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageAllocatorLowCostAligned.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A stage allocator that iteratively allocates containers in the + * {@link DurationInterval} with lowest overall cost. The algorithm only + * considers intervals of the form: [stageDeadline - (n+1)*duration, + * stageDeadline - n*duration) for an integer n. This guarantees that the + * allocations are aligned (as opposed to overlapping duration intervals). + * + * The smoothnessFactor parameter controls the number of containers that are + * simultaneously allocated in each iteration of the algorithm. + */ + +public class StageAllocatorLowCostAligned implements IStageAllocator { + + // Smoothness factor + private int smoothnessFactor = 10; + + // Constructor + public StageAllocatorLowCostAligned() { + } + + // Constructor + public StageAllocatorLowCostAligned(int smoothnessFactor) { + this.smoothnessFactor = smoothnessFactor; + } + + // computeJobAllocation() + @Override + public Map computeStageAllocation( + Plan plan, Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + // Initialize + ResourceCalculator resCalc = plan.getResourceCalculator(); + Resource capacity = plan.getTotalCapacity(); + long step = plan.getStep(); + + // Create allocationRequestsearlies + RLESparseResourceAllocation allocationRequests = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Initialize parameters + long duration = stepRoundUp(rr.getDuration(), step); + int windowSizeInDurations = + (int) ((stageDeadline - stageEarliestStart) / duration); + int totalGangs = rr.getNumContainers() / rr.getConcurrency(); + int numContainersPerGang = rr.getConcurrency(); + Resource gang = + Resources.multiply(rr.getCapability(), numContainersPerGang); + + // Set maxGangsPerUnit + int maxGangsPerUnit = + (int) Math.max( + Math.floor(((double) totalGangs) / windowSizeInDurations), 1); + maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); + + // If window size is too small, return null + if (windowSizeInDurations <= 0) { + return null; + } + + // Initialize tree sorted by costs + TreeSet durationIntervalsSortedByCost = + new TreeSet(new Comparator() { + @Override + public int compare(DurationInterval val1, DurationInterval val2) { + + int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost()); + if (cmp != 0) { + return cmp; + } + + return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); + } + }); + + // Add durationIntervals that end at (endTime - n*duration) for some n. + for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart + + duration; intervalEnd -= duration) { + + long intervalStart = intervalEnd - duration; + + // Get duration interval [intervalStart,intervalEnd) + DurationInterval durationInterval = + getDurationInterval(intervalStart, intervalEnd, planLoads, + planModifications, capacity, resCalc, step); + + // If the interval can fit a gang, add it to the tree + if (durationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(durationInterval); + } + } + + // Allocate + int remainingGangs = totalGangs; + while (remainingGangs > 0) { + + // If no durationInterval can fit a gang, break and return null + if (durationIntervalsSortedByCost.isEmpty()) { + break; + } + + // Get best duration interval + DurationInterval bestDurationInterval = + durationIntervalsSortedByCost.first(); + int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); + + // Add it + remainingGangs -= numGangsToAllocate; + + ReservationInterval reservationInt = + new ReservationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getEndTime()); + + ReservationRequest reservationRes = + ReservationRequest.newInstance(rr.getCapability(), + rr.getConcurrency() * numGangsToAllocate, rr.getConcurrency(), + duration); + + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.addInterval(reservationInt, reservationRes); + + // Remove from tree + durationIntervalsSortedByCost.remove(bestDurationInterval); + + // Get updated interval + DurationInterval updatedDurationInterval = + getDurationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getStartTime() + duration, planLoads, + planModifications, capacity, resCalc, step); + + // Add to tree, if possible + if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(updatedDurationInterval); + } + + } + + // Get the final allocation + Map allocations = + allocationRequests.toIntervalMap(); + + // If no gangs are left to place we succeed and return the allocation + if (remainingGangs <= 0) { + return allocations; + } else { + + // If we are here is because we did not manage to satisfy this request. + // We remove unwanted side-effect from planModifications (needed for ANY). + for (Map.Entry tempAllocation + : allocations.entrySet()) { + + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + + } + // Return null to signal failure in this allocation + return null; + + } + + } + + protected DurationInterval getDurationInterval(long startTime, long endTime, + Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Initialize the dominant loads structure + Resource dominantResources = Resource.newInstance(0, 0); + + // Calculate totalCost and maxLoad + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + + // Get the load + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Increase the total cost + totalCost += calcCostOfLoad(load, capacity, resCalc); + + // Update the dominant resources + dominantResources = Resources.componentwiseMax(dominantResources, load); + + } + + // Return the corresponding durationInterval + return new DurationInterval(startTime, endTime, totalCost, + dominantResources); + + } + + protected double calcCostOfInterval(long startTime, long endTime, + Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Sum costs in the interval [startTime,endTime) + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity, + resCalc); + } + + // Return sum + return totalCost; + + } + + protected double calcCostOfTimeSlot(long t, Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc) { + + // Get the current load at time t + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Return cost + return calcCostOfLoad(load, capacity, resCalc); + + } + + protected Resource getLoadAtTime(long t, Map planLoads, + RLESparseResourceAllocation planModifications) { + + Resource planLoad = planLoads.get(t); + planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad; + + return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); + + } + + protected double calcCostOfLoad(Resource load, Resource capacity, + ResourceCalculator resCalc) { + + return resCalc.ratio(load, capacity); + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + /** + * An inner class that represents an interval, typically of length duration. + * The class holds the total cost of the interval and the maximal load inside + * the interval in each dimension (both calculated externally). + */ + protected static class DurationInterval { + + private long startTime; + private long endTime; + private double cost; + private Resource maxLoad; + + // Constructor + public DurationInterval(long startTime, long endTime, double cost, + Resource maxLoad) { + this.startTime = startTime; + this.endTime = endTime; + this.cost = cost; + this.maxLoad = maxLoad; + } + + // canAllocate() - boolean function, returns whether requestedResources + // can be allocated during the durationInterval without + // violating capacity constraints + public boolean canAllocate(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources); + return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0); + + } + + // numCanFit() - returns the maximal number of requestedResources can be + // allocated during the durationInterval without violating + // capacity constraints + public int numCanFit(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + // Represents the largest resource demand that can be satisfied throughout + // the entire DurationInterval (i.e., during [startTime,endTime)) + Resource availableResources = Resources.subtract(capacity, maxLoad); + + // Maximal number of requestedResources that fit inside the interval + return (int) Math.floor(Resources.divide(resCalc, capacity, + availableResources, requestedResources)); + + } + + public long getStartTime() { + return this.startTime; + } + + public void setStartTime(long value) { + this.startTime = value; + } + + public long getEndTime() { + return this.endTime; + } + + public void setEndTime(long value) { + this.endTime = value; + } + + public Resource getMaxLoad() { + return this.maxLoad; + } + + public void setMaxLoad(Resource value) { + this.maxLoad = value; + } + + public double getTotalCost() { + return this.cost; + } + + public void setTotalCost(double value) { + this.cost = value; + } + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByDemand.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByDemand.java new file mode 100644 index 0000000..a34401d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByDemand.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.ListIterator; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; + +/** + * Sets the earliest start time of a stage proportional to the job weight. The + * interval [jobArrival, stageDeadline) is divided as follows. First, each stage + * is guaranteed at least its requested duration. Then, the stage receives a + * fraction of the remaining time. The fraction is calculated as the ratio + * between the weight (total requested resources) of the stage and the total + * weight of all proceeding stages. + */ + +public class StageEarliestStartByDemand implements IStageEarliestStart { + + private long step; + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + step = plan.getStep(); + + // If this is the first stage, don't bother with the computation. + if (index < 1) { + return reservation.getArrival(); + } + + // Get iterator + ListIterator li = + reservation.getReservationRequests().getReservationResources() + .listIterator(index); + ReservationRequest rr; + + // Calculate the total weight & total duration + double totalWeight = calcWeight(current); + long totalDuration = getRoundedDuration(current, plan); + + while (li.hasPrevious()) { + rr = li.previous(); + totalWeight += calcWeight(rr); + totalDuration += getRoundedDuration(rr, plan); + } + + // Compute the weight of the current stage as compared to remaining ones + double ratio = calcWeight(current) / totalWeight; + + // Estimate an early start time, such that: + // 1. Every stage is guaranteed to receive at least its duration + // 2. The remainder of the window is divided between stages + // proportionally to its workload (total memory consumption) + long window = stageDeadline - reservation.getArrival(); + long windowRemainder = window - totalDuration; + long earlyStart = + (long) (stageDeadline - getRoundedDuration(current, plan) + - (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + earlyStart = stepRoundUp(earlyStart, step); + + // Return + return earlyStart; + + } + + // Weight = total memory consumption of stage + protected double calcWeight(ReservationRequest stage) { + return (stage.getDuration() * stage.getCapability().getMemory()) + * (stage.getNumContainers()); + } + + protected long getRoundedDuration(ReservationRequest stage, Plan plan) { + return stepRoundUp(stage.getDuration(), step); + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByJobArrival.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByJobArrival.java new file mode 100644 index 0000000..8c59356 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/StageEarliestStartByJobArrival.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; + +/** + * Sets the earliest start time of a stage as the job arrival time. + */ +public class StageEarliestStartByJobArrival implements IStageEarliestStart { + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + return reservation.getArrival(); + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TryManyReservationAgents.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TryManyReservationAgents.java new file mode 100644 index 0000000..4c666b3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TryManyReservationAgents.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * A planning algorithm that invokes several other planning algorithms according + * to a given order. If one of the planners succeeds, the allocation it + * generates is returned. + */ +public class TryManyReservationAgents implements ReservationAgent { + + // Planning algorithms + private final List algs; + + // Constructor + public TryManyReservationAgents(List algs) { + this.algs = new LinkedList(algs); + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.createReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.updateReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + return plan.deleteReservation(reservationId); + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestAlignedPlanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestAlignedPlanner.java new file mode 100644 index 0000000..d2e23cd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestAlignedPlanner.java @@ -0,0 +1,798 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestAlignedPlanner { + + ReservationAgent agent; + InMemoryPlan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + Random rand = new Random(); + long step; + + @Test + public void testSingleReservationAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario1(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 5 * step, // Job arrival time + 20 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(2048, 2), // Capability + 10, // Num containers + 5, // Concurrency + 10 * step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 20 * step, 10, 2048, 2)); + + } + + @Test + public void testOrderNoGapImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10L, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testOrderNoGapImpossible2() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 13 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 10, // Num containers + 10, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testOrderImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testAnyImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 3 * step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ANY, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testAnyAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ANY, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 14 * step, 15 * step, 20, 1024, 1)); + + } + + @Test + public void testAllAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 11 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 14 * step, 15 * step, 20, 1024, 1)); + + } + + @Test + public void testAllImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testUpdate() throws PlanningException { + + // Create flexible reservation + ReservationDefinition rrFlex = + createReservationDefinition( + 10 * step, // Job arrival time + 14 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 100, // Num containers + 1, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create blocking reservation + ReservationDefinition rrBlock = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 100, // Num containers + 100, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create reservation IDs + ReservationId flexReservationID = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId blockReservationID = + ReservationSystemTestUtil.getNewReservationId(); + + // Add block, add flex, remove block, update flex + agent.createReservation(blockReservationID, "uBlock", plan, rrBlock); + agent.createReservation(flexReservationID, "uFlex", plan, rrFlex); + agent.deleteReservation(blockReservationID, "uBlock", plan); + agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", flexReservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(flexReservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 14 * step, 50, 1024, 1)); + + } + + @Test + public void testImpossibleDuration() throws PlanningException { + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 10 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 0); + + } + + @Test + public void testLoadedDurationIntervals() throws PlanningException { + + int numJobsInScenario = initializeScenario3(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 13 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 80, // Num containers + 10, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 11 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 11 * step, 12 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 12 * step, 13 * step, 40, 1024, 1)); + } + + @Test + public void testCostFunction() throws PlanningException { + + // Create large memory reservation + ReservationDefinition rr7Mem1Core = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(7 * 1024, 1),// Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create reservation + ReservationDefinition rr6Mem6Cores = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(6 * 1024, 6),// Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u2"); + + // Create reservation + ReservationDefinition rr = + createReservationDefinition( + 10 * step, // Job arrival time + 12 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u3"); + + // Create reservation IDs + ReservationId reservationID1 = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId reservationID3 = + ReservationSystemTestUtil.getNewReservationId(); + + // Add all + agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core); + agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores); + agent.createReservation(reservationID3, "u3", plan, rr); + + // Get reservation + ReservationAllocation alloc3 = plan.getReservationById(reservationID3); + + assertTrue(alloc3.toString(), + check(alloc3, 10 * step, 11 * step, 0, 1024, 1)); + assertTrue(alloc3.toString(), + check(alloc3, 11 * step, 12 * step, 1, 1024, 1)); + + } + + @Test + public void testFromCluster() throws PlanningException { + + // int numJobsInScenario = initializeScenario3(); + + List list = new ArrayList(); + + // Create reservation + list.add(createReservationDefinition( + 1425716392178L, // Job arrival time + 1425722262791L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 587000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1")); + + list.add(createReservationDefinition( + 1425716406178L, // Job arrival time + 1425721255841L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 485000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u2")); + + list.add(createReservationDefinition( + 1425716399178L, // Job arrival time + 1425723780138L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 738000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u3")); + + list.add(createReservationDefinition( + 1425716437178L, // Job arrival time + 1425722968378L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 653000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u4")); + + list.add(createReservationDefinition( + 1425716406178L, // Job arrival time + 1425721926090L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 552000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u5")); + + list.add(createReservationDefinition( + 1425716379178L, // Job arrival time + 1425722238553L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 586000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u6")); + + list.add(createReservationDefinition( + 1425716407178L, // Job arrival time + 1425722908317L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 650000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u7")); + + list.add(createReservationDefinition( + 1425716452178L, // Job arrival time + 1425722841562L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 639000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u8")); + + list.add(createReservationDefinition( + 1425716384178L, // Job arrival time + 1425721766129L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 538000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u9")); + + list.add(createReservationDefinition( + 1425716437178L, // Job arrival time + 1425722507886L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 5, // Num containers + 1, // Concurrency + 607000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u10")); + + // Add reservation + int i = 1; + for (ReservationDefinition rr : list) { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u" + Integer.toString(i), plan, + rr); + ++i; + } + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == list.size()); + + } + + @Before + public void setup() throws Exception { + + // Initialize random seed + long seed = rand.nextLong(); + rand.setSeed(seed); + Log.info("Running with seed: " + seed); + + // Set cluster parameters + long timeWindow = 1000000L; + int capacityMem = 100 * 1024; + int capacityCores = 100; + step = 60000L; + + Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores); + + // Set configuration + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + String reservationQ = testUtil.getFullReservationQueueName(); + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + + // Set planning agent + agent = new AlignedPlannerWithGreedy(); + + // Create Plan + plan = + new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true); + } + + private int initializeScenario1() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(0L, step, new int[] { 10, 10, 20, 20, 20, 10, 10 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private int initializeScenario2() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(11 * step, step, new int[] { 90, 90, 90 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private int initializeScenario3() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(10 * step, step, new int[] { 70, 80, 60 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private void addFixedAllocation(long start, long step, int[] f) + throws PlanningException { + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, + "user_fixed", "dedicated", start, start + f.length * step, + ReservationSystemTestUtil.generateAllocation(start, step, f), res, + minAlloc))); + + } + + private ReservationDefinition createReservationDefinition(long arrival, + long deadline, ReservationRequest[] reservationRequests, + ReservationRequestInterpreter rType, String username) { + + return ReservationDefinition.newInstance(arrival, deadline, + ReservationRequests.newInstance(Arrays.asList(reservationRequests), + rType), username); + + } + + private boolean check(ReservationAllocation alloc, long start, long end, + int containers, int mem, int cores) { + + Resource expectedResources = + Resource.newInstance(mem * containers, cores * containers); + + // Verify that all allocations in [start,end) equal containers * (mem,cores) + for (long i = start; i < end; i++) { + if (!Resources.equals(alloc.getResourcesAtTime(i), expectedResources)) { + return false; + } + } + return true; + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java index c7301c7..295c8b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -164,6 +164,58 @@ public void testZeroAlloaction() { Assert.assertTrue(rleSparseVector.isEmpty()); } + @Test + public void testToIntervalMap() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + Map mapAllocations; + + // Check empty + mapAllocations = rleSparseVector.toIntervalMap(); + Assert.assertTrue(mapAllocations.isEmpty()); + + // Check full + int[] alloc = { 0, 5, 10, 10, 5, 0, 5, 0 }; + int start = 100; + Set> inputs = + generateAllocation(start, alloc, false).entrySet(); + for (Entry ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + mapAllocations = rleSparseVector.toIntervalMap(); + Assert.assertTrue(mapAllocations.size() == 5); + for (Entry entry : mapAllocations + .entrySet()) { + ReservationInterval interval = entry.getKey(); + ReservationRequest request = entry.getValue(); + if (interval.getStartTime() == 101L) { + Assert.assertTrue(interval.getEndTime() == 102L); + Assert.assertEquals(request.getCapability(), + Resource.newInstance(5 * 1024, 5)); + } else if (interval.getStartTime() == 102L) { + Assert.assertTrue(interval.getEndTime() == 104L); + Assert.assertEquals(request.getCapability(), + Resource.newInstance(10 * 1024, 10)); + } else if (interval.getStartTime() == 104L) { + Assert.assertTrue(interval.getEndTime() == 105L); + Assert.assertEquals(request.getCapability(), + Resource.newInstance(5 * 1024, 5)); + } else if (interval.getStartTime() == 105L) { + Assert.assertTrue(interval.getEndTime() == 106L); + Assert.assertEquals(request.getCapability(), + Resource.newInstance(0 * 1024, 0)); + } else if (interval.getStartTime() == 106L) { + Assert.assertTrue(interval.getEndTime() == 107L); + Assert.assertEquals(request.getCapability(), + Resource.newInstance(5 * 1024, 5)); + } else { + Assert.fail(); + } + } + } + private Map generateAllocation( int startTime, int[] alloc, boolean isStep) { Map req =