diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java index 5a61b94..0f90991 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java @@ -331,7 +331,8 @@ private void validateInput(Plan plan, ReservationRequest rr, // placing other ReservationRequest within the same // ReservationDefinition, // and we must avoid double-counting the available resources - tempAssigned.addInterval(reservationInt, reservationRes); + tempAssigned.addInterval(reservationInt, + InMemoryReservationAllocation.toResource(reservationRes)); allocationRequests.put(reservationInt, reservationRes); } @@ -353,7 +354,8 @@ private void validateInput(Plan plan, ReservationRequest rr, for (Map.Entry tempAllocation : allocationRequests.entrySet()) { tempAssigned.removeInterval(tempAllocation.getKey(), - tempAllocation.getValue()); + InMemoryReservationAllocation.toResource( + tempAllocation.getValue())); } // and return null to signal failure in this allocation return null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index ce2e7d7..50d66cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -110,7 +109,7 @@ public QueueMetrics getQueueMetrics() { private void incrementAllocation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); - Map allocationRequests = + Map allocationRequests = reservation.getAllocationRequests(); // check if we have encountered the user earlier and if not add an entry String user = reservation.getUser(); @@ -119,7 +118,7 @@ private void incrementAllocation(ReservationAllocation reservation) { resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc); userResourceAlloc.put(user, resAlloc); } - for (Map.Entry r : allocationRequests + for (Map.Entry r : allocationRequests .entrySet()) { resAlloc.addInterval(r.getKey(), r.getValue()); rleSparseVector.addInterval(r.getKey(), r.getValue()); @@ -128,11 +127,11 @@ private void incrementAllocation(ReservationAllocation reservation) { private void decrementAllocation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); - Map allocationRequests = + Map allocationRequests = reservation.getAllocationRequests(); String user = reservation.getUser(); RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); - for (Map.Entry r : allocationRequests + for (Map.Entry r : allocationRequests .entrySet()) { resAlloc.removeInterval(r.getKey(), r.getValue()); rleSparseVector.removeInterval(r.getKey(), r.getValue()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index fc8407b..7ada1e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -40,7 +41,7 @@ private final ReservationDefinition contract; private final long startTime; private final long endTime; - private final Map allocationRequests; + private final Map allocationRequests; private boolean hasGang = false; private long acceptedAt = -1; @@ -49,25 +50,33 @@ InMemoryReservationAllocation(ReservationId reservationID, ReservationDefinition contract, String user, String planName, long startTime, long endTime, - Map allocationRequests, + Map allocations, ResourceCalculator calculator, Resource minAlloc) { this.contract = contract; this.startTime = startTime; this.endTime = endTime; this.reservationID = reservationID; this.user = user; - this.allocationRequests = allocationRequests; + this.allocationRequests = new HashMap<>(); this.planName = planName; resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc); - for (Map.Entry r : allocationRequests + for (Map.Entry r : allocations .entrySet()) { - resourcesOverTime.addInterval(r.getKey(), r.getValue()); + Resource capacity = toResource(r.getValue()); + allocationRequests.put(r.getKey(), capacity); + resourcesOverTime.addInterval(r.getKey(), capacity); if (r.getValue().getConcurrency() > 1) { hasGang = true; } } } + public static Resource toResource(ReservationRequest request) { + Resource resource = Resources.multiply(request.getCapability(), + (float)request.getNumContainers()); + return resource; + } + @Override public ReservationId getReservationId() { return reservationID; @@ -89,7 +98,7 @@ public long getEndTime() { } @Override - public Map getAllocationRequests() { + public Map getAllocationRequests() { return Collections.unmodifiableMap(allocationRequests); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 3f6f405..2957cc6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.StringWriter; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; @@ -31,9 +30,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -80,14 +77,11 @@ private boolean isSameAsNext(Long key, Resource capacity) { * * @param reservationInterval the interval for which the resource is to be * added - * @param capacity the resource to be added + * @param totCap the resource to be added * @return true if addition is successful, false otherwise */ public boolean addInterval(ReservationInterval reservationInterval, - ReservationRequest capacity) { - Resource totCap = - Resources.multiply(capacity.getCapability(), - (float) capacity.getNumContainers()); + Resource totCap) { if (totCap.equals(ZERO_RESOURCE)) { return true; } @@ -143,44 +137,15 @@ public boolean addInterval(ReservationInterval reservationInterval, } /** - * Add multiple resources for the specified interval - * - * @param reservationInterval the interval for which the resource is to be - * added - * @param ReservationRequests the resources to be added - * @param clusterResource the total resources in the cluster - * @return true if addition is successful, false otherwise - */ - public boolean addCompositeInterval(ReservationInterval reservationInterval, - List ReservationRequests, Resource clusterResource) { - ReservationRequest aggregateReservationRequest = - Records.newRecord(ReservationRequest.class); - Resource capacity = Resource.newInstance(0, 0); - for (ReservationRequest ReservationRequest : ReservationRequests) { - Resources.addTo(capacity, Resources.multiply( - ReservationRequest.getCapability(), - ReservationRequest.getNumContainers())); - } - aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources - .divide(resourceCalculator, clusterResource, capacity, minAlloc))); - aggregateReservationRequest.setCapability(minAlloc); - - return addInterval(reservationInterval, aggregateReservationRequest); - } - - /** * Removes a resource for the specified interval * * @param reservationInterval the interval for which the resource is to be * removed - * @param capacity the resource to be removed + * @param totCap the resource to be removed * @return true if removal is successful, false otherwise */ public boolean removeInterval(ReservationInterval reservationInterval, - ReservationRequest capacity) { - Resource totCap = - Resources.multiply(capacity.getCapability(), - (float) capacity.getNumContainers()); + Resource totCap) { if (totCap.equals(ZERO_RESOURCE)) { return true; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java index 89c0e55..0d3c692 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; /** @@ -71,7 +70,7 @@ * @return the allocationRequests the map of resources requested against the * time interval for which they were */ - public Map getAllocationRequests(); + public Map getAllocationRequests(); /** * Return a string identifying the plan to which the reservation belongs diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java index 76f39dc..190cf50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java @@ -175,13 +175,25 @@ private void doAssertions(ReservationAllocation rAllocation, int[] alloc) { Assert.assertEquals(reservationID, rAllocation.getReservationId()); Assert.assertEquals(rDef, rAllocation.getReservationDefinition()); - Assert.assertEquals(allocations, rAllocation.getAllocationRequests()); + Assert.assertEquals(toResources(allocations), + rAllocation.getAllocationRequests()); Assert.assertEquals(user, rAllocation.getUser()); Assert.assertEquals(planName, rAllocation.getPlanName()); Assert.assertEquals(start, rAllocation.getStartTime()); Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime()); } + private Map toResources( + Map allocations) { + Map resources = new HashMap<>(); + for (Map.Entry + entry : allocations.entrySet()) { + resources.put(entry.getKey(), + InMemoryReservationAllocation.toResource(entry.getValue())); + } + return resources; + } + private ReservationDefinition createSimpleReservationDefinition(long arrival, long deadline, long duration) { // create a request with a single atomic ask diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java index c7301c7..c00d7d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -49,7 +49,8 @@ public void testBlocks() { Set> inputs = generateAllocation(start, alloc, false).entrySet(); for (Entry ip : inputs) { - rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + rleSparseVector.addInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); Assert.assertFalse(rleSparseVector.isEmpty()); @@ -64,7 +65,8 @@ public void testBlocks() { Assert.assertEquals(Resource.newInstance(0, 0), rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); for (Entry ip : inputs) { - rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + rleSparseVector.removeInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); for (int i = 0; i < alloc.length; i++) { @@ -86,7 +88,8 @@ public void testSteps() { Set> inputs = generateAllocation(start, alloc, true).entrySet(); for (Entry ip : inputs) { - rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + rleSparseVector.addInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); Assert.assertFalse(rleSparseVector.isEmpty()); @@ -102,7 +105,8 @@ public void testSteps() { Assert.assertEquals(Resource.newInstance(0, 0), rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); for (Entry ip : inputs) { - rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + rleSparseVector.removeInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); for (int i = 0; i < alloc.length; i++) { @@ -124,7 +128,8 @@ public void testSkyline() { Set> inputs = generateAllocation(start, alloc, true).entrySet(); for (Entry ip : inputs) { - rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + rleSparseVector.addInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); Assert.assertFalse(rleSparseVector.isEmpty()); @@ -140,7 +145,8 @@ public void testSkyline() { Assert.assertEquals(Resource.newInstance(0, 0), rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); for (Entry ip : inputs) { - rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + rleSparseVector.removeInterval(ip.getKey(), + InMemoryReservationAllocation.toResource(ip.getValue())); } LOG.info(rleSparseVector.toString()); for (int i = 0; i < alloc.length; i++) { @@ -157,7 +163,7 @@ public void testZeroAlloaction() { RLESparseResourceAllocation rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc); rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE), - ReservationRequest.newInstance(Resource.newInstance(0, 0), (0))); + Resource.newInstance(0, 0)); LOG.info(rleSparseVector.toString()); Assert.assertEquals(Resource.newInstance(0, 0), rleSparseVector.getCapacityAtTime(new Random().nextLong()));