diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 482751f..f4bb87f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -353,7 +353,7 @@ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container reservedContainer) { // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { @@ -373,6 +373,11 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && if (getTotalRequiredResources(priority) <= 0) { return null; } + + Container container = reservedContainer; + if (container == null) { + container = createContainer(node, request.getCapability(), request.getPriority()); + } // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, @@ -485,21 +490,24 @@ public Container createContainer( * in {@link FSSchedulerNode}.. * return whether reservation was possible with the current threshold limits */ - private boolean reserve(Priority priority, FSSchedulerNode node, - Container container, NodeType type, boolean alreadyReserved) { + private boolean reserve(ResourceRequest request, FSSchedulerNode node, + Container reservedContainer, NodeType type) { + Priority priority = request.getPriority(); if (!reservationExceedsThreshold(node, type)) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + getApplicationId()); - if (!alreadyReserved) { - getMetrics().reserveResource(getUser(), container.getResource()); + if (reservedContainer == null) { + reservedContainer = + createContainer(node, request.getCapability(), request.getPriority()); + getMetrics().reserveResource(getUser(), reservedContainer.getResource()); RMContainer rmContainer = - super.reserve(node, priority, null, container); + super.reserve(node, priority, null, reservedContainer); node.reserveResource(this, priority, rmContainer); setReservation(node); } else { RMContainer rmContainer = node.getReservedContainer(); - super.reserve(node, priority, rmContainer, container); + super.reserve(node, priority, rmContainer, reservedContainer); node.reserveResource(this, priority, rmContainer); setReservation(node); } @@ -615,18 +623,16 @@ private Resource assignContainer( // How much does the node have? Resource available = node.getUnallocatedResource(); - Container container = null; + Container reservedContainer = null; if (reserved) { - container = node.getReservedContainer().getContainer(); - } else { - container = createContainer(node, capability, request.getPriority()); + reservedContainer = node.getReservedContainer().getContainer(); } // Can we allocate a container on this node? if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - allocate(type, node, request.getPriority(), request, container); + allocate(type, node, request.getPriority(), request, reservedContainer); if (allocatedContainer == null) { // Did the application need this resource? if (reserved) { @@ -647,30 +653,30 @@ private Resource assignContainer( // the AM. Set the amResource for this app and update the leaf queue's AM // usage if (!isAmRunning() && !getUnmanagedAM()) { - setAMResource(container.getResource()); - getQueue().addAMResourceUsage(container.getResource()); + setAMResource(capability); + getQueue().addAMResourceUsage(capability); setAmRunning(true); } - return container.getResource(); + return capability; } // The desired container won't fit here, so reserve - if (isReservable(container) && - reserve(request.getPriority(), node, container, type, reserved)) { + if (isReservable(capability) && + reserve(request, node, reservedContainer, type)) { return FairScheduler.CONTAINER_RESERVED; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Not creating reservation as container " + container.getId() - + " is not reservable"); + LOG.debug("Couldn't creating reservation for " + + getName() + ",at priority " + priority); } return Resources.none(); } } - private boolean isReservable(Container container) { + private boolean isReservable(Resource capacity) { return scheduler.isAtLeastReservationThreshold( - getQueue().getPolicy().getResourceCalculator(), container.getResource()); + getQueue().getPolicy().getResourceCalculator(), capacity); } private boolean hasNodeOrRackLocalRequests(Priority priority) {