diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java new file mode 100644 index 0000000..7f64e22 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * This is the interface for policy that validate new + * {@link ReservationAllocation}s for allocations being added to a {@link Plan}. + * Individual policies will be enforcing different invariants. + */ +public interface SharingPolicy { + + /** + * Initialize this policy + * @param inventoryQueuePath the name of the queue for this plan + * @param conf + */ + public void init(String inventoryQueuePath, Configuration conf, Set excludeList); + + /** + * This method runs the policy validation logic, and return true/false on + * whether the {@link ReservationAllocation} is acceptable according to this + * sharing policy. + * + * @param plan the {@link Plan} we validate against + * @param newAllocation the allocation proposed to be added to the + * {@link Plan} + * @throws PlanningException if the policy is respected if we add this + * {@link ReservationAllocation} to the {@link Plan} + * @throws IOException + */ + public void validate(Plan plan, ReservationAllocation newAllocation) + throws PlanningException; + + /** + * Returns the time range before and after the current reservation considered + * by this policy. In particular, this informs the archival process for the + * {@link Plan}, i.e., reservations regarding times before (now - validWindow) + * can be deleted. + * + * @return validWindow the window of validity considered by the policy. + */ + public long getValidWindow(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java new file mode 100644 index 0000000..a06ecf7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -0,0 +1,242 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Date; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchingUserException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This policy enforces a time-extended notion of Capacity. In particular it + * guarantees that the allocation received in input when combined with all + * previous allocation for the user does not violate an instantaneous max limit + * on the resources received, and that for every window of time of length + * validWindow, the integral of the allocations for a user (sum of the currently + * submitted allocation and all prior allocations for the user) does not exceed + * validWindow * maxAvg. + * + * This allows flexibility, in the sense that an allocation can instantaneously + * use large portions of the available capacity, but prevents abuses by bounding + * the average use over time. + * + * By controlling maxInst, maxAvg, validWindow the administrator configuring + * this policy can obtain a behavior ranging from instantaneously enforced + * capacity (akin to existing queues), or fully flexible allocations (likely + * reserved to super-users, or trusted systems). + */ +public class CapacityOverTimePolicy implements SharingPolicy { + + private CapacitySchedulerConfiguration conf; + private long validWindow; + private float maxInst; + private float maxAvg; + private Set excludeList; + + + // For now this is CapacityScheduler specific, but given a hierarchy in the + // configuration structure of the schedulers (e.g., SchedulerConfiguration) + // it should be easy to remove this limitation + @Override + public void init(String reservationQueuePath, Configuration conf, + Set excludeList) { + + if (CapacitySchedulerConfiguration.class.isInstance(conf)) { + this.conf = (CapacitySchedulerConfiguration) conf; + } else { + throw new RuntimeException( + "Unsupported conf type, only supported type is capacity based."); + } + validWindow = this.conf.getReservationWindow(reservationQueuePath); + maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; + maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; + this.excludeList = excludeList; + }; + + @Override + public void validate(Plan plan, ReservationAllocation reservation) + throws PlanningException { + + // this is entire method invoked under a write-lock on the plan, no need + // to synchronize accesses to the plan further + + // The default-queue reservation is infinitely long, and guaranteed to be + // valid. Skipping validation. + if (excludeList.contains(reservation.getReservationId())) { + return; + } + + // Try to verify whether there is already a reservation with this ID in + // the system (remove its contribution during validation to simulate a try-n-swap + // update). + ReservationAllocation oldReservation = plan.getReservationById(reservation + .getReservationId()); + + // sanity check that the update of a reservation is not changing username + if (oldReservation != null + && !oldReservation.getUser().equals(reservation.getUser())) { + throw new MismatchingUserException( + "Updating an existing reservation with mismatched user:" + + oldReservation.getUser() + " != " + reservation.getUser()); + } + + long startTime = reservation.getStartTime(); + long endTime = reservation.getEndTime(); + long step = plan.getStep(); + + Resource planTotalCapacity = plan.getTotalCapacity(); + + Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); + Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); + + // define variable that will store integral of resources (need diff class to + // avoid overflow issues for long/large allocations) + IntegralResource runningTot = new IntegralResource(0L, 0L); + IntegralResource maxAllowed = new IntegralResource(maxAvgRes); + maxAllowed.multiplyBy(validWindow / step); + + // check that the resources offered to the user during any window of length + // "validWindow" overlapping this allocation are within maxAllowed + // also enforce instantaneous and physical constraints during this pass + for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { + + Resource currExistingAllocTot = plan.getTotalCommittedResources(t); + Resource currExistingAllocForUser = + plan.getConsumptionForUser(reservation.getUser(), t); + Resource currNewAlloc = reservation.getResourcesAtTime(t); + Resource currOldAlloc = Resources.none(); + if (oldReservation != null) { + currOldAlloc = oldReservation.getResourcesAtTime(t); + } + + // throw exception if the cluster is overcommitted + // tot_allocated - old + new > capacity + Resource inst = Resources.subtract( + Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc); + if (Resources.greaterThan(plan.getResourceCalculator(), + planTotalCapacity, inst, planTotalCapacity)) { + throw new ResourceOverCommitException(" Resources at time " + t + + " would be overcommitted (" + inst + " over " + + plan.getTotalCapacity() + ")by " + "accepting reservation: " + + reservation.getReservationId()); + } + + // throw exception if instantaneous limits are violated + // tot_alloc_to_this_user - old + new > inst_limit + if (Resources.greaterThan(plan.getResourceCalculator(), + planTotalCapacity, Resources.subtract( + Resources.add(currExistingAllocForUser, currNewAlloc), + currOldAlloc), maxInsRes)) { + throw new PlanningQuotaException("Instantaneous quota capacity " + + maxInst + " would be passed at time " + t + + " by accepting reservation: " + reservation.getReservationId()); + } + + // throw exception if the running integral of utilization over validWindow + // is violated. We perform a delta check, adding/removing instants at the + // boundary of the window from runningTot. + + // runningTot = previous_runningTot + currExistingAllocForUser + + // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; + + // Where: + // 1) currNewAlloc, currExistingAllocForUser represent the contribution of + // the instant in time added in this pass. + // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time + // instants that are being retired from the the window + // 3) currOldAlloc is the contribution (if any) of the previous version of + // this reservation (the one we are updating) + + runningTot.add(currExistingAllocForUser); + runningTot.add(currNewAlloc); + runningTot.subtract(currOldAlloc); + + // expire contributions from instant in time before (t - validWindow) + if (t > startTime) { + Resource pastOldAlloc = + plan.getConsumptionForUser(reservation.getUser(), t - validWindow); + Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); + + // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; + runningTot.subtract(pastOldAlloc); + runningTot.subtract(pastNewAlloc); + } + + // check integral + // runningTot > maxAvg * validWindow + // NOTE: we need to use comparator of IntegralResource directly, as + // Resource and ResourceCalculator assume "int" amount of resources, + // which is not sufficient when comparing integrals (out-of-bound) + if (maxAllowed.compareTo(runningTot) < 0) { + throw new PlanningQuotaException( + "Integral (avg over time) quota capacity " + maxAvg + + " over a window of " + validWindow / 1000 + " seconds, " + + " would be passed at time " + t + "(" + new Date(t) + + ") by accepting reservation: " + + reservation.getReservationId()); + } + } + } + + @Override + public long getValidWindow() { + return validWindow; + } + + /** + * This class provides support for Resource-like book-keeping, based on long(s), + * as using Resource to store the "integral" of the allocation over time leads + * to integer overflows for large allocations/clusters. (Evolving Resource to + * use long is too disruptive at this point.) + * + * The comparison/multiplication behaviors of IntegralResource are consistent + * with the DefaultResourceCalculator. + */ + public class IntegralResource { + long memory; + long vcores; + + public IntegralResource(Resource resource) { + this.memory = resource.getMemory(); + this.vcores = resource.getVirtualCores(); + } + + public IntegralResource(long mem, long vcores) { + this.memory = mem; + this.vcores = vcores; + } + + public void add(Resource r) { + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + + public void subtract(Resource r) { + memory -= r.getMemory(); + vcores -= r.getVirtualCores(); + } + + public void multiplyBy(long window) { + memory = memory * window; + vcores = vcores * window; + } + + public long compareTo(IntegralResource other) { + long diff = memory - other.memory; + if (diff == 0) { + diff = vcores - other.vcores; + } + return diff; + } + + @Override + public String toString() { + return ""; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java new file mode 100644 index 0000000..2e4c668 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -0,0 +1,81 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchingUserException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This policy enforce a simple physical cluster capacity constraints, by + * validating that the allocation proposed fits in the current plan. This + * validation is compatible with "updates" and in verifying the capacity + * constraints it conceptually remove the prior version of the reservation. + */ +public class NoOverCommitPolicy implements SharingPolicy { + + private Set excludeList; + + @Override + public void validate(Plan plan, ReservationAllocation reservation) + throws PlanningException { + + ReservationAllocation oldReservation = + plan.getReservationById(reservation.getReservationId()); + + // The default-queue reservation is infinitely long, and guaranteed to be + // valid. Skipping validation. + if (excludeList.contains(reservation.getReservationId())) { + return; + } + + // check updates are using same name + if (oldReservation != null + && !oldReservation.getUser().equals(reservation.getUser())) { + throw new MismatchingUserException( + "Updating an existing reservation with mismatching user:" + + oldReservation.getUser() + " != " + reservation.getUser()); + } + + long startTime = reservation.getStartTime(); + long endTime = reservation.getEndTime(); + long step = plan.getStep(); + + // for every instant in time, check we are respecting cluster capacity + for (long t = startTime; t < endTime; t += step) { + Resource currExistingAllocTot = plan.getTotalCommittedResources(t); + Resource currNewAlloc = reservation.getResourcesAtTime(t); + Resource currOldAlloc = Resource.newInstance(0, 0); + if (oldReservation != null) { + oldReservation.getResourcesAtTime(t); + } + // check the cluster is never over committed + // currExistingAllocTot + currNewAlloc - currOldAlloc > + // capPlan.getTotalCapacity() + if (Resources.greaterThan(plan.getResourceCalculator(), plan + .getTotalCapacity(), Resources.subtract( + Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc), + plan.getTotalCapacity())) { + throw new ResourceOverCommitException("Resources at time " + t + + " would be overcommitted by " + "accepting reservation: " + + reservation.getReservationId()); + } + } + } + + @Override + public void init(String reservationQueueName, Configuration conf, + Set excludeList) { + this.excludeList = excludeList; + } + + @Override + public long getValidWindow() { + // this policy has no "memory" so the valid window is set to zero + return 0; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchingUserException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchingUserException.java new file mode 100644 index 0000000..622f984 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchingUserException.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +public class MismatchingUserException extends PlanningException { + + private static final long serialVersionUID = 8313222590561668413L; + + public MismatchingUserException(String message) { + super(message); + } + + public MismatchingUserException(Throwable cause) { + super(cause); + } + + public MismatchingUserException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java new file mode 100644 index 0000000..7553e82 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +public class PlanningQuotaException extends PlanningException { + + private static final long serialVersionUID = 8206629288380246166L; + + public PlanningQuotaException(String message) { + super(message); + } + + public PlanningQuotaException(Throwable cause) { + super(cause); + } + + public PlanningQuotaException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java new file mode 100644 index 0000000..aac2b93 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +public class ResourceOverCommitException extends PlanningException { + + private static final long serialVersionUID = 7070699407526521032L; + + public ResourceOverCommitException(String message) { + super(message); + } + + public ResourceOverCommitException(Throwable cause) { + super(cause); + } + + public ResourceOverCommitException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java new file mode 100644 index 0000000..6001631 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -0,0 +1,142 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.HashSet; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNoOverCommitPolicy { + + long step; + long initTime; + + InMemoryPlan plan; + ReservationAgent mAgent; + Resource minAlloc; + ResourceCalculator res; + Resource maxAlloc; + + int totCont = 1000000; + + @Before + public void setup() throws Exception { + + // 1 sec step + step = 1000L; + + initTime = System.currentTimeMillis(); + minAlloc = Resource.newInstance(1024, 1); + res = new DefaultResourceCalculator(); + maxAlloc = Resource.newInstance(1024 * 8, 8); + + mAgent = mock(ReservationAgent.class); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + NoOverCommitPolicy policy = new NoOverCommitPolicy(); + policy.init(reservationQ, capConf, new HashSet()); + + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + "dedicated", null, true); + } + + public int[] generateData(int length, int val) { + int[] data = new int[length]; + for (int i = 0; i < length; i++) { + data[i] = val; + } + return data; + } + + @Test + public void testSingleUserEasyFitPass() throws IOException, PlanningException { + // generate allocation that easily fit within resource constraints + int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testSingleUserBarelyFitPass() throws IOException, + PlanningException { + // generate allocation from single tenant that barely fit + int[] f = generateData(3600, totCont); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testSingleFail() throws IOException, PlanningException { + // generate allocation from single tenant that exceed capacity + int[] f = generateData(3600, (int) (1.1 * totCont)); + try { + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil + .generateAllocation(initTime, step, f), res, minAlloc)); + Assert.fail(); + } catch (PlanningException e) { + // expected exception + } + } + + @Test + public void testMultiTenantPass() throws IOException, PlanningException { + // generate allocation from multiple tenants that barely fit in tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 4; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } + + @Test + public void testMultiTenantFail() throws IOException, PlanningException { + // generate allocation from multiple tenants that exceed tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + try { + for (int i = 0; i < 5; i++) { + assertTrue( + plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), + null, + "u" + i, + "dedicated", + initTime, + initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + Assert.fail(); + } catch (PlanningException e) { + // expected + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java new file mode 100644 index 0000000..a043e6a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -0,0 +1,233 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestCapacityOverTimePolicy { + + long timeWindow; + long step; + float avgConstraint; + float instConstraint; + long initTime; + + InMemoryPlan plan; + ReservationAgent mAgent; + Resource minAlloc; + ResourceCalculator res; + Resource maxAlloc; + + int totCont = 1000000; + + @Before + public void setup() throws Exception { + + // 24h window + timeWindow = 86400000L; + // 1 sec step + step = 1000L; + + // 25% avg cap on capacity + avgConstraint = 25; + + // 70% instantaneous cap on capacity + instConstraint = 70; + + initTime = System.currentTimeMillis(); + minAlloc = Resource.newInstance(1024, 1); + res = new DefaultResourceCalculator(); + maxAlloc = Resource.newInstance(1024 * 8, 8); + + mAgent = mock(ReservationAgent.class); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + capConf.setReservationWindow(reservationQ, timeWindow); + capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); + capConf.setAverageCapacity(reservationQ, avgConstraint); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, capConf, new HashSet()); + + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + "dedicated", null, true); + } + + public int[] generateData(int length, int val) { + int[] data = new int[length]; + for (int i = 0; i < length; i++) { + data[i] = val; + } + return data; + } + + @Test + public void testSimplePass() throws IOException, PlanningException { + // generate allocation that simply fit within all constraints + int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testSimplePass2() throws IOException, PlanningException { + // generate allocation from single tenant that exceed avg momentarily but + // fit within + // max instantanesou + int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testMultiTenantPass() throws IOException, PlanningException { + // generate allocation from multiple tenants that barely fit in tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 4; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } + + @Test + public void testMultiTenantFail() throws IOException, PlanningException { + // generate allocation from multiple tenants that exceed tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + try { + for (int i = 0; i < 5; i++) { + assertTrue( + plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), + null, + "u" + i, + "dedicated", + initTime, + initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + Assert.fail(); + } catch (PlanningException e) { + // expected + } + } + + @Test + public void testInstFail() throws IOException { + // generate allocation that exceed the instantaneous cap single-show + int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont)); + try { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + Assert.fail("should not have accepted this"); + } catch (PlanningException e) { + // expected + } + } + + @Test + public void testInstFailBySum() throws IOException, PlanningException { + // generate allocation that exceed the instantaneous cap by sum + int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont)); + try { + for (int i = 0; i < 3; i++) { + assertTrue( + plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), + null, + "u1", + "dedicated", + initTime, + initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + Assert.fail("Should not have accepted this"); + } catch (PlanningException e) { + // expected + } + } + + @Test + public void testFailAvg() throws IOException { + // generate an allocation which violates the 25% average single-shot + Map req = + new TreeMap(); + long win = timeWindow / 2 + 100; + int cont = (int) Math.ceil(0.5 * totCont); + req.put(new ReservationInterval(initTime, initTime + win), + ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont)); + try { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + Log.info(plan.toString()); + Assert.fail("should not have accepted this"); + } catch (PlanningException e) { + // expected + } + } + + @Test + public void testFailAvgBySum() throws IOException { + // generate an allocation which violates the 25% average by sum + Map req = + new TreeMap(); + long win = 86400000 / 4 + 1; + int cont = (int) Math.ceil(0.5 * totCont); + req.put(new ReservationInterval(initTime, initTime + win), + ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont)); + try { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + Assert.fail("should not have accepted this"); + } catch (PlanningException e) { + // expected + } + } + +}