diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 2a6657aa5df..0de6fb3a7d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -57,6 +57,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AMShareLimitCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignReservedContainerResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.HasContainerForNodeCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ValidReservationCheckResult; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -74,6 +79,9 @@ private static final DefaultResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); + private static final String ASSIGN_CONTAINER_MESSAGE_TEMPLATE = + "Assign container on node %s, assignType: %s, allowedLocality: %s, priority: %s, app attempt id: %s"; + private final long startTime; private final Priority appPriority; private Resource demand = Resources.createResource(0); @@ -690,7 +698,7 @@ public synchronized void recoverContainer(SchedulerNode node, /** * Reserve a spot for {@code container} on this {@code node}. If * the container is {@code alreadyReserved} on the node, simply - * update relevant bookeeping. This dispatches ro relevant handlers + * update relevant bookkeeping. This dispatches to relevant handlers * in {@link FSSchedulerNode}.. * return whether reservation was possible with the current threshold limits */ @@ -841,7 +849,7 @@ int getNumReservations(String rackName, boolean isAny) { * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was * made, returns an empty resource. */ - private Resource assignContainer( + private ResourceAssignment assignContainer( FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, SchedulerRequestKey schedulerKey) { @@ -867,13 +875,14 @@ private Resource assignContainer( if (reserved) { unreserve(schedulerKey, node); } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format( "Resource ask %s fits in available node resources %s, " + - "but no container was allocated", - capability, available)); + "but no container was allocated", + capability, available); } - return Resources.none(); + return ResourceAssignment.createEmpty(this, message); } // If we had previously made a reservation, delete it @@ -893,7 +902,13 @@ private Resource assignContainer( setAmRunning(true); } - return capability; + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Resource allocated on node %s with " + + "Resource capability %s, " + + "Available resources on node: %s", node, capability, available); + } + return ResourceAssignment.create(capability, this, message); } if (LOG.isDebugEnabled()) { @@ -913,15 +928,25 @@ private Resource assignContainer( if (LOG.isDebugEnabled()) { LOG.debug(getName() + "'s resource request is reserved."); } - return FairScheduler.CONTAINER_RESERVED; + + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Container is reserved on node %s with " + + "resource: %s", + node, pendingAsk.getPerAllocationResource()); + } + return ResourceAssignment.create(FairScheduler.CONTAINER_RESERVED, + this, message); } else { updateAMDiagnosticMsg(capability, " exceeds the available resources of " + "the node and the request cannot be reserved)"); - if (LOG.isDebugEnabled()) { - LOG.debug("Couldn't create reservation for app: " + getName() - + ", at priority " + schedulerKey.getPriority()); + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format( + "Couldn't create reservation for app: %s, at priority ", + schedulerKey.getPriority()); } - return Resources.none(); + return ResourceAssignment.createEmpty(this, message); } } @@ -936,23 +961,47 @@ private boolean isReservable(Resource capacity) { /** * Whether the AM container for this app is over maxAMShare limit. */ - private boolean isOverAMShareLimit() { + private AMShareLimitCheckResult isOverAMShareLimit() { // Check the AM resource usage for the leaf queue if (!isAmRunning() && !getUnmanagedAM()) { - // Return true if we have not ask, or queue is not be able to run app's AM + // Return true if we have don't have an ask or queue + // is not be able to run app's AM PendingAsk ask = appSchedulingInfo.getNextPendingAsk(); - if (ask.getCount() == 0 || !getQueue().canRunAppAM( - ask.getPerAllocationResource())) { - return true; + + if (ask.getCount() == 0) { + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("App attempt %s does not have any ask " + + "on queue %s", + this, getQueue()); + } + return AMShareLimitCheckResult.createOverLimitResult(message); } + + return getQueue().canRunAppAM( + ask.getPerAllocationResource()); } - return false; + + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format( + "AM share check result is successful! AM running: %b, " + + "Unmanaged AM: %b", + isAmRunning(), getUnmanagedAM()); + } + return AMShareLimitCheckResult.createBelowLimitResult(message); } @SuppressWarnings("deprecation") - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { - if (LOG.isTraceEnabled()) { - LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved); + private ResourceAssignment assignContainer(FSSchedulerNode node, + boolean reserved) { + final ResourceAssignment resourceAssignment; + + if (ResourceAssignment.shouldLogReservationActivityTrace()) { + resourceAssignment = ResourceAssignment.createEmptyWithTrace(this, + "Node offered to app: " + getName() + " reserved: " + reserved); + } else { + resourceAssignment = ResourceAssignment.createEmpty(this, ""); } Collection keysToTry = (reserved) ? @@ -973,7 +1022,15 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { for (SchedulerRequestKey schedulerKey : keysToTry) { // Skip it for reserved container, since // we already check it in isValidReservation. - if (!reserved && !hasContainerForNode(schedulerKey, node)) { + HasContainerForNodeCheckResult hasContainerForNode = + hasContainerForNode(schedulerKey, node); + if (!reserved && !hasContainerForNode.isSuccessful()) { + if (ResourceAssignment.shouldLogReservationActivityTrace()) { + resourceAssignment.traceMessage(String.format( + "Ignoring schedulerKey: %s! " + + "Reserved: %b, hasContainerForNode result: %s", + schedulerKey, reserved, hasContainerForNode.getMessage())); + } continue; } @@ -1003,15 +1060,10 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { scheduler.getNodeLocalityThreshold(), scheduler.getRackLocalityThreshold()); } - if (rackLocalPendingAsk.getCount() > 0 && nodeLocalPendingAsk.getCount() > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Assign container on " + node.getNodeName() - + " node, assignType: NODE_LOCAL" + ", allowedLocality: " - + allowedLocality + ", priority: " + schedulerKey.getPriority() - + ", app attempt id: " + this.attemptId); - } + logContainerAssignment(node, resourceAssignment, schedulerKey, + allowedLocality, "NODE_LOCAL"); return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL, reserved, schedulerKey); } @@ -1023,12 +1075,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (rackLocalPendingAsk.getCount() > 0 && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality .equals(NodeType.OFF_SWITCH))) { - if (LOG.isTraceEnabled()) { - LOG.trace("Assign container on " + node.getNodeName() - + " node, assignType: RACK_LOCAL" + ", allowedLocality: " - + allowedLocality + ", priority: " + schedulerKey.getPriority() - + ", app attempt id: " + this.attemptId); - } + logContainerAssignment(node, resourceAssignment, schedulerKey, + allowedLocality, "RACK_LOCAL"); return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL, reserved, schedulerKey); } @@ -1042,20 +1090,16 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (offswitchAsk.getCount() > 0) { if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks() <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Assign container on " + node.getNodeName() - + " node, assignType: OFF_SWITCH" + ", allowedLocality: " - + allowedLocality + ", priority: " - + schedulerKey.getPriority() - + ", app attempt id: " + this.attemptId); - } + logContainerAssignment(node, resourceAssignment, schedulerKey, + allowedLocality, "OFF_SWITCH"); return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH, reserved, schedulerKey); } } - if (LOG.isTraceEnabled()) { - LOG.trace("Can't assign container on " + node.getNodeName() + if (ResourceAssignment.shouldLogReservationActivityTrace()) { + resourceAssignment.traceMessage("Can't assign container on " + + node.getNodeName() + " node, allowedLocality: " + allowedLocality + ", priority: " + schedulerKey.getPriority() + ", app attempt id: " + this.attemptId); @@ -1065,14 +1109,30 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { writeLock.unlock(); } - return Resources.none(); + if (ResourceAssignment.shouldLogReservationActivity()) { + resourceAssignment.logMessage(String.format( + "Cant't assign container on node %s with Scheduler request keys: %s", + node.getNodeName(), keysToTry)); + } + return resourceAssignment; + } + + private void logContainerAssignment(FSSchedulerNode node, + ResourceAssignment resourceAssignment, SchedulerRequestKey schedulerKey, + NodeType allowedLocality, String assignType) { + if (ResourceAssignment.shouldLogReservationActivityTrace()) { + resourceAssignment.traceMessage(String.format( + ASSIGN_CONTAINER_MESSAGE_TEMPLATE, + node.getNodeName(), assignType, allowedLocality, + schedulerKey.getPriority(), this.attemptId)); + } } /** * Whether this app has containers requests that could be satisfied on the * given node, if the node had full space. */ - private boolean hasContainerForNode(SchedulerRequestKey key, + private HasContainerForNodeCheckResult hasContainerForNode(SchedulerRequestKey key, FSSchedulerNode node) { PendingAsk offswitchAsk = getPendingAsk(key, ResourceRequest.ANY); Resource resource = offswitchAsk.getPerAllocationResource(); @@ -1083,38 +1143,68 @@ private boolean hasContainerForNode(SchedulerRequestKey key, boolean hasRequestForNode = getOutstandingAsksCount(key, node.getNodeName()) > 0; - boolean ret = true; + boolean canDelayToAny = appSchedulingInfo.canDelayTo(key, + ResourceRequest.ANY); + boolean canDelayToNode = appSchedulingInfo.canDelayTo(key, + node.getRackName()); + boolean fitsInNodeCapability = Resources.fitsIn(resource, + node.getRMNode().getTotalCapability()); if (!(// There must be outstanding requests at the given priority: hasRequestForOffswitch && // If locality relaxation is turned off at *-level, there must be a // non-zero request for the node's rack: - (appSchedulingInfo.canDelayTo(key, ResourceRequest.ANY) || - (hasRequestForRack)) && + (canDelayToAny || hasRequestForRack) && // If locality relaxation is turned off at rack-level, // there must be a non-zero request at the node: - (!hasRequestForRack || appSchedulingInfo.canDelayTo(key, - node.getRackName()) || (hasRequestForNode)) && + (!hasRequestForRack || canDelayToNode || hasRequestForNode) && // The requested container must be able to fit on the node: - Resources.fitsIn(resource, - node.getRMNode().getTotalCapability()))) { - ret = false; + fitsInNodeCapability)) { + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("No container is assigned for appAttempt %s " + + "on node %s! Details: " + + "Off-switch ask: %s " + + "hasRequestForOffSwitch: %b, hasRequestForRack: %b, " + + "hasRequestForNode: %b, canDelayToAny: %b, canDelayToNode: %b, " + + "fitsInNodeCapability: %b", this, node, offswitchAsk, + hasRequestForOffswitch, hasRequestForRack, hasRequestForNode, + canDelayToAny, canDelayToNode, fitsInNodeCapability); + } + return HasContainerForNodeCheckResult.createUnsuccessful(message); } else if (!getQueue().fitsInMaxShare(resource)) { // The requested container must fit in queue maximum share updateAMDiagnosticMsg(resource, " exceeds current queue or its parents maximum resource allowed). " + "Max share of queue: " + getQueue().getMaxShare()); - ret = false; + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("%s exceeds current queue or its " + + "parent's maximum resource allowed. " + + "Max share of queue: %s", resource, getQueue().getMaxShare()); + } + return HasContainerForNodeCheckResult.createUnsuccessful(message); } - return ret; + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Container is assigned for appAttempt %s " + + "on node %s! Details: " + + "Off-switch ask: %s, " + + "Max share of queue: %s", this, node, offswitchAsk, + getQueue().getMaxShare()); + } + return HasContainerForNodeCheckResult.createSuccessful(message); } - private boolean isValidReservation(FSSchedulerNode node) { + private ValidReservationCheckResult isValidReservation(FSSchedulerNode node) { SchedulerRequestKey schedulerKey = node.getReservedContainer(). getReservedSchedulerKey(); - return hasContainerForNode(schedulerKey, node) && - !isOverAMShareLimit(); + HasContainerForNodeCheckResult hasContainerForNode = + hasContainerForNode(schedulerKey, node); + AMShareLimitCheckResult overAMShareLimit = isOverAMShareLimit(); + return ValidReservationCheckResult.createFrom(hasContainerForNode, + overAMShareLimit); } /** @@ -1127,19 +1217,24 @@ private boolean isValidReservation(FSSchedulerNode node) { * Node that the application has an existing reservation on * @return whether the reservation on the given node is valid. */ - boolean assignReservedContainer(FSSchedulerNode node) { + AssignReservedContainerResult assignReservedContainer(FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); SchedulerRequestKey reservedSchedulerKey = rmContainer.getReservedSchedulerKey(); - if (!isValidReservation(node)) { + ValidReservationCheckResult validReservation = isValidReservation(node); + if (!validReservation.isSuccessful()) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for " + "application " + getApplicationAttemptId() + " on node " + node); unreserve(reservedSchedulerKey, node); - return false; + return AssignReservedContainerResult. + createInvalid(validReservation, this); } + AssignReservedContainerResult result = + AssignReservedContainerResult.createValid(validReservation, this); + // Reservation valid; try to fulfill the reservation if (LOG.isDebugEnabled()) { LOG.debug("Trying to fulfill reservation for application " @@ -1149,11 +1244,31 @@ boolean assignReservedContainer(FSSchedulerNode node) { // Fail early if the reserved container won't fit. // Note that we have an assumption here that // there's only one container size per priority. - if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), - node.getUnallocatedResource())) { + Resource reservedContainerResources = + node.getReservedContainer().getReservedResource(); + boolean fitsInNodeResources = + Resources.fitsIn(reservedContainerResources, + node.getUnallocatedResource()); + if (fitsInNodeResources) { + if (ResourceAssignment.shouldLogReservationActivity()) { + result.logAdditionalInfo( + String.format("Trying to assign container for appAttempt %s " + + "on node %s, Resources of reserved container: %s, " + + "Unallocated resources of node: %s", this, node, + reservedContainerResources, node.getUnallocatedResource())); + } assignContainer(node, true); + } else { + if (ResourceAssignment.shouldLogReservationActivity()) { + result.logAdditionalInfo( + String.format("Tried to assign container for appAttempt %s " + + "on node %s, but there was not enough resources to take! " + + "Resources of reserved container: %s, " + + "Unallocated resources of node: %s", this, node, + reservedContainerResources, node.getUnallocatedResource())); + } } - return true; + return result; } /** @@ -1372,17 +1487,22 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { - if (isOverAMShareLimit()) { + public ResourceAssignment assignContainer(FSSchedulerNode node) { + AMShareLimitCheckResult amShareLimitCheckResult = isOverAMShareLimit(); + if (!amShareLimitCheckResult.isSuccessful()) { PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), " exceeds maximum AM resource allowed)."); - if (LOG.isDebugEnabled()) { - LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() - + " exceeds maximum AM resource allowed, " - + getQueue().dumpState()); + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("AM resource request: %s exceeds maximum " + + "AM resource allowed, " + + "queue dump: %s", amAsk.getPerAllocationResource(), + getQueue().dumpState()); + } - return Resources.none(); + return ResourceAssignment.createEmptyWithPreCheckResult(this, + amShareLimitCheckResult, message); } return assignContainer(node, false); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index a0388873278..f86b3871012 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AMShareLimitCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.util.resource.Resources.none; @@ -335,31 +338,48 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { - Resource assigned = none(); + public ResourceAssignment assignContainer(FSSchedulerNode node) { if (LOG.isDebugEnabled()) { LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName() + " fairShare: " + getFairShare()); } - if (!assignContainerPreCheck(node)) { - return assigned; + AssignContainerPreCheckResult preCheckResult = + assignContainerPreCheck(node); + if (!preCheckResult.isSuccessful()) { + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = "Assign container precheck on node " + node + " failed"; + } + return ResourceAssignment.createEmptyWithPreCheckResult(preCheckResult, + message); } for (FSAppAttempt sched : fetchAppsWithDemand(true)) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } - assigned = sched.assignContainer(node); - if (!assigned.equals(none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned container in queue:" + getName() + " " + - "container:" + assigned); + ResourceAssignment assignment = sched.assignContainer(node); + if (!assignment.isEmpty()) { + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Assigned container " + + "in queue: %s, container: %s", getName(), + assignment.getResource()); + } - break; + assignment.logMessage(message); + return assignment; } } - return assigned; + + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Assignment was not allocated on " + + "child queue: %s", this); + } + return ResourceAssignment.createEmptyWithPreCheckResult(preCheckResult, + message); } /** @@ -514,15 +534,31 @@ private Resource computeMaxAMResource() { * @param amResource resources required to run the AM * @return true if this queue can run */ - public boolean canRunAppAM(Resource amResource) { + AMShareLimitCheckResult canRunAppAM(Resource amResource) { if (Math.abs(maxAMShare - -1.0f) < 0.0001) { - return true; + return AMShareLimitCheckResult.createAMShareDisabledResult(getName()); } Resource maxAMResource = computeMaxAMResource(); getMetrics().setMaxAMShare(maxAMResource); - Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); - return Resources.fitsIn(ifRunAMResource, maxAMResource); + Resource currentAMUsage = Resources.add(amResourceUsage, amResource); + boolean result = Resources.fitsIn(currentAMUsage, maxAMResource); + + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("AM share limit result is " + + (result ? " successful" : " not successful! ") + + "Resources required to run AM: %s, " + + "max AM resource: %s, " + + "current AM resource usage " + + "(incl. Resources required to run AM): %s", + amResource, maxAMResource, currentAMUsage); + } + + if (result) { + return AMShareLimitCheckResult.createBelowLimitResult(message); + } + return AMShareLimitCheckResult.createOverLimitResult(message); } void addAMResourceUsage(Resource amResource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index e9f4af67450..bc92ab6f2f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -190,15 +192,17 @@ private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { } @Override - public Resource assignContainer(FSSchedulerNode node) { - Resource assigned = Resources.none(); - + public ResourceAssignment assignContainer(FSSchedulerNode node) { // If this queue is over its limit, reject - if (!assignContainerPreCheck(node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assign container precheck on node " + node + " failed"); + AssignContainerPreCheckResult preCheckResult = + assignContainerPreCheck(node); + if (!preCheckResult.isSuccessful()) { + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = "Assign container precheck on node " + node + " failed"; } - return assigned; + return ResourceAssignment.createEmptyWithPreCheckResult(preCheckResult, + message); } // Sort the queues while holding a read lock on this parent only. @@ -214,15 +218,21 @@ public Resource assignContainer(FSSchedulerNode node) { try { sortedChildQueues.addAll(childQueues); for (FSQueue child : sortedChildQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; + ResourceAssignment assignment = child.assignContainer(node); + if (!assignment.isEmpty()) { + return assignment; } } } finally { readLock.unlock(); } - return assigned; + + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = "Assignment was not allocated on " + + "none of the child queues: " + sortedChildQueues; + } + return ResourceAssignment.createEmpty(message); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 6217f550f8b..7eb9092b979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -44,10 +44,16 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult.PreCheckResult.NODE_HAS_RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult.PreCheckResult.RESOURCE_USAGE_GREATER_THAN_MAXSHARE; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult.PreCheckResult.SUCCESSFUL; + @Private @Unstable public abstract class FSQueue implements Queue, Schedulable { @@ -426,22 +432,33 @@ public abstract void collectSchedulerApplications( * * @return true if check passes (can assign) or false otherwise */ - boolean assignContainerPreCheck(FSSchedulerNode node) { + AssignContainerPreCheckResult assignContainerPreCheck(FSSchedulerNode node) { if (node.getReservedContainer() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigning container failed on node '" + node.getNodeName() - + " because it has reserved containers."); + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = + "Assigning container failed on node " + node.getNodeName() + + " because it has reserved containers."; } - return false; + return new AssignContainerPreCheckResult(NODE_HAS_RESERVED_CONTAINERS, + message); } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Assigning container failed on node '" + node.getNodeName() - + " because queue resource usage is larger than MaxShare: " - + dumpState()); + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = + "Assigning container failed on node " + node.getNodeName() + + " because queue resource usage is larger than MaxShare: " + + dumpState(); } - return false; + return new AssignContainerPreCheckResult( + RESOURCE_USAGE_GREATER_THAN_MAXSHARE, message); } else { - return true; + String message = ""; + if (ResourceAssignment.shouldLogReservationActivity()) { + message = String.format("Assign container precheck passed on node %s! " + + "Details of queue: %s", node, dumpState()); + } + return new AssignContainerPreCheckResult(SUCCESSFUL, message); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e5d2a066c4a..2481612ac2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignReservedContainerResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -1112,13 +1114,13 @@ static void assignPreemptedContainers(FSSchedulerNode node) { FSAppAttempt app = entry.getKey(); Resource preemptionPending = Resources.clone(entry.getValue()); while (!app.isStopped() && !Resources.isNone(preemptionPending)) { - Resource assigned = app.assignContainer(node); - if (Resources.isNone(assigned) || - assigned.equals(FairScheduler.CONTAINER_RESERVED)) { + ResourceAssignment assignment = app.assignContainer(node); + if (assignment.isEmpty() || assignment.isContainerReserved()) { // Fail to assign, let's not try further break; } - Resources.subtractFromNonNegative(preemptionPending, assigned); + Resources.subtractFromNonNegative(preemptionPending, + assignment.getResource()); } } } @@ -1152,11 +1154,21 @@ void attemptScheduling(FSSchedulerNode node) { // when C does not qualify for preemption itself. assignPreemptedContainers(node); FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); - boolean validReservation = false; + + AssignReservedContainerResult reservedContainerResult = + AssignReservedContainerResult.createInvalid(null, + reservedAppSchedulable); if (reservedAppSchedulable != null) { - validReservation = reservedAppSchedulable.assignReservedContainer(node); + reservedContainerResult = + reservedAppSchedulable.assignReservedContainer(node); + } + + if (ResourceAssignment.shouldLogReservationActivity()) { + ResourceAssignment.logMessage( + String.format("AssignReservedContainerResult: %s", + reservedContainerResult), reservedAppSchedulable); } - if (!validReservation) { + if (!reservedContainerResult.isSuccessful()) { // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; Resource assignedResource = Resources.clone(Resources.none()); @@ -1164,19 +1176,28 @@ void attemptScheduling(FSSchedulerNode node) { node.getUnallocatedResource(), 0.5f); while (node.getReservedContainer() == null) { - Resource assignment = queueMgr.getRootQueue().assignContainer(node); + ResourceAssignment assignment = + queueMgr.getRootQueue().assignContainer(node); - if (assignment.equals(Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("No container is allocated on node " + node); + if (assignment.isEmpty()) { + if (ResourceAssignment.shouldLogReservationActivity()) { + assignment.logMessage("No container is allocated on node " + + node); } break; } assignedContainers++; - Resources.addTo(assignedResource, assignment); + Resources.addTo(assignedResource, assignment.getResource()); if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, assignedResource)) { + if (ResourceAssignment.shouldLogReservationActivityTrace()) { + assignment.traceMessage(String.format( + "shouldContinueAssigning returned false on node %s! " + + "Assigned containers: %d, maxResourcesToAssign: %s, " + + "assignedResource: %s", node, assignedContainers, + maxResourcesToAssign, assignedResource)); + } break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index bd1ff7ada25..03da20025eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; /** * A Schedulable represents an entity that can be scheduled such as an @@ -90,7 +91,7 @@ * Assign a container on this node if possible, and return the amount of * resources assigned. */ - Resource assignContainer(FSSchedulerNode node); + ResourceAssignment assignContainer(FSSchedulerNode node); /** Get the fair share assigned to this Schedulable. */ Resource getFairShare(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AMShareLimitCheckResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AMShareLimitCheckResult.java new file mode 100644 index 00000000000..6dd59aa4122 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AMShareLimitCheckResult.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; + +public class AMShareLimitCheckResult implements FSAssignmentCheckResult { + private static final String AM_SHARE_CHECK_DISABLED = + "AM share check is disabled for queue %s!"; + private final boolean successful; + private final String message; + + private AMShareLimitCheckResult(boolean successful, String message) { + this.successful = successful; + this.message = message; + } + + public static AMShareLimitCheckResult createBelowLimitResult(String message) { + return new AMShareLimitCheckResult(true, message); + } + + public static AMShareLimitCheckResult createOverLimitResult(String message) { + return new AMShareLimitCheckResult(false, message); + } + + /** + * If AM share is disabled, the result will be successful + * like the request would have been below the AM share. + * @param queueName + * @return + */ + public static AMShareLimitCheckResult createAMShareDisabledResult( + String queueName) { + return new AMShareLimitCheckResult(true, + String.format(AM_SHARE_CHECK_DISABLED, queueName)); + } + + /** + * @return true, if request is below AM share limit. + */ + @Override + public boolean isSuccessful() { + return successful; + } + + @Override + public String getMessage() { + return message; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignContainerPreCheckResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignContainerPreCheckResult.java new file mode 100644 index 00000000000..5c40ef43375 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignContainerPreCheckResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.AssignContainerPreCheckResult.PreCheckResult.SUCCESSFUL; + +public class AssignContainerPreCheckResult implements FSAssignmentCheckResult { + public enum PreCheckResult { + NODE_HAS_RESERVED_CONTAINERS, RESOURCE_USAGE_GREATER_THAN_MAXSHARE, + SUCCESSFUL + } + + private PreCheckResult resultType; + private String message; + + public AssignContainerPreCheckResult(PreCheckResult resultType, + String message) { + this.resultType = resultType; + this.message = message; + } + + public boolean isSuccessful() { + return resultType == SUCCESSFUL; + } + + @Override + public String getMessage() { + return message; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignReservedContainerResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignReservedContainerResult.java new file mode 100644 index 00000000000..92fd4024db2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/AssignReservedContainerResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; + +public class AssignReservedContainerResult implements FSAssignmentCheckResult { + private ValidReservationCheckResult reservationCheckResult; + private FSAppAttempt appAttempt; + + private AssignReservedContainerResult( + ValidReservationCheckResult validReservation, FSAppAttempt appAttempt) { + this.reservationCheckResult = validReservation; + this.appAttempt = appAttempt; + } + + public static AssignReservedContainerResult createInvalid( + ValidReservationCheckResult checkResult, FSAppAttempt appAttempt) { + if (checkResult != null && checkResult.isSuccessful()) { + throw new IllegalStateException( + "Only invalid (unsuccessful) ValidReservationCheckResults " + + "are allowed to pass here!"); + } + return new AssignReservedContainerResult(checkResult, appAttempt); + } + + public static AssignReservedContainerResult createValid( + ValidReservationCheckResult checkResult, FSAppAttempt appAttempt) { + if (!checkResult.isSuccessful()) { + throw new IllegalStateException( + "Only valid (successful) ValidReservationCheckResults " + + "are allowed to pass here!"); + } + return new AssignReservedContainerResult(checkResult, appAttempt); + } + + @Override + public String getMessage() { + if (reservationCheckResult != null) { + return reservationCheckResult.getMessage(); + } + return ""; + } + + @Override + public boolean isSuccessful() { + if (reservationCheckResult == null) { + return false; + } + return reservationCheckResult.isSuccessful(); + } + + public void logAdditionalInfo(String message) { + ResourceAssignment.logMessage(message, appAttempt); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/FSAssignmentCheckResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/FSAssignmentCheckResult.java new file mode 100644 index 00000000000..1ed77d47fe1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/FSAssignmentCheckResult.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; + +public interface FSAssignmentCheckResult { + String getMessage(); + boolean isSuccessful(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/HasContainerForNodeCheckResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/HasContainerForNodeCheckResult.java new file mode 100644 index 00000000000..cce197505a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/HasContainerForNodeCheckResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; + +public class HasContainerForNodeCheckResult implements FSAssignmentCheckResult { + private final boolean successful; + private final String message; + + private HasContainerForNodeCheckResult(boolean successful, String message) { + this.successful = successful; + this.message = message; + } + + public static HasContainerForNodeCheckResult createUnsuccessful( + String message) { + return new HasContainerForNodeCheckResult(false, message); + } + + public static HasContainerForNodeCheckResult createSuccessful( + String message) { + return new HasContainerForNodeCheckResult(true, message); + } + + @Override + public String getMessage() { + return message; + } + + @Override + public boolean isSuccessful() { + return successful; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ResourceAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ResourceAssignment.java new file mode 100644 index 00000000000..c37b5df36d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ResourceAssignment.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * This class is used to store container Resource assignments, + * along with debug/trace messages that can tell more about + * how and why {@link FairScheduler} decided to either allocate or not + * allocate containers to {@link FSAppAttempt}s. + * If clients wants to create log records in connection with + * Resource assignments, + * they should invoke shouldLogReservationActivity first to check + * if the log level is DEBUG or TRACE. + * After that, clients should invoke either logMessage or traceMessage. + * + */ +public class ResourceAssignment { + private static final Logger LOG = + LoggerFactory.getLogger(ResourceAssignment.class); + private static final String LOG_FORMAT_TEMPLATE = + "[AppId: %s, AppAttemptId: %s] %s"; + + private final Resource resource; + private final FSAppAttempt appAttempt; + private final String message; + + private ResourceAssignment(Resource resource, FSAppAttempt appAttempt, + String message, Level level) { + this.resource = resource; + this.appAttempt = appAttempt; + this.message = message; + logMessage(level); + } + + private ResourceAssignment(Resource resource, FSAppAttempt appAttempt, + String message) { + this(resource, appAttempt, message, Level.DEBUG); + } + + private void logMessage(Level level) { + if (message != null && !message.isEmpty()) { + if (level == Level.TRACE) { + traceMessage(message, appAttempt); + } else if (level == Level.DEBUG) { + logMessage(message, appAttempt); + } + } + } + + public static ResourceAssignment create(Resource resource, + FSAppAttempt appAttempt, String message) { + return new ResourceAssignment(resource, appAttempt, message); + } + + public static ResourceAssignment createEmpty(String message) { + return new ResourceAssignment(Resources.none(), null, message); + } + + public static ResourceAssignment createEmpty(FSAppAttempt appAttempt, + String message) { + return new ResourceAssignment(Resources.none(), appAttempt, message); + } + + public static ResourceAssignment createEmptyWithTrace( + FSAppAttempt appAttempt, String message) { + return new ResourceAssignment(Resources.none(), appAttempt, message, + Level.TRACE); + } + + public static ResourceAssignment createEmptyWithPreCheckResult( + FSAssignmentCheckResult checkResult, String additionalMessage) { + return createEmptyWithPreCheckResult(null, checkResult, additionalMessage); + } + + public static ResourceAssignment createEmptyWithPreCheckResult( + FSAppAttempt appAttempt, FSAssignmentCheckResult checkResult, + String additionalMessage) { + String fullMessage = checkResult.getMessage(); + if (shouldLogReservationActivity()) { + fullMessage = String.format("Message of checkResult: %s, " + + "additional message: %s", checkResult.getMessage(), + additionalMessage); + } + + return createEmpty(appAttempt, fullMessage); + } + + public void traceMessage(String message) { + trace(message, appAttempt); + } + + private void traceMessage(String message, FSAppAttempt appAttempt) { + trace(message, appAttempt); + } + + public void logMessage(String message) { + debug(message, appAttempt); + } + + public static void logMessage(String message, FSAppAttempt appAttempt) { + debug(message, appAttempt); + } + + private void trace(String message, FSAppAttempt appAttempt) { + if (appAttempt != null) { + LOG.trace(String.format(LOG_FORMAT_TEMPLATE, + appAttempt.getApplicationId(), appAttempt.getApplicationAttemptId(), + message)); + } else { + LOG.trace(message); + } + } + + private static void debug(String message, FSAppAttempt appAttempt) { + if (appAttempt != null) { + LOG.debug(String.format(LOG_FORMAT_TEMPLATE, + appAttempt.getApplicationId(), appAttempt.getApplicationAttemptId(), + message)); + } + LOG.debug(message); + } + + public boolean isEmpty() { + return Resources.isNone(resource); + } + + public boolean isContainerReserved() { + return Resources.equals(resource, FairScheduler.CONTAINER_RESERVED); + } + + public Resource getResource() { + return resource; + } + + public static boolean shouldLogReservationActivity() { + return LOG.isDebugEnabled() || LOG.isTraceEnabled(); + } + + public static boolean shouldLogReservationActivityTrace() { + return LOG.isTraceEnabled(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ValidReservationCheckResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ValidReservationCheckResult.java new file mode 100644 index 00000000000..769b42518c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/assignment/ValidReservationCheckResult.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment; + +public class ValidReservationCheckResult implements FSAssignmentCheckResult { + + private final boolean successful; + private final String message; + + private ValidReservationCheckResult(boolean successful, String message) { + this.successful = successful; + this.message = message; + } + + public static ValidReservationCheckResult createFrom( + HasContainerForNodeCheckResult hasContainerForNode, + AMShareLimitCheckResult overAMShareLimit) { + boolean successful = hasContainerForNode.isSuccessful() && + overAMShareLimit.isSuccessful(); + String message = String.format( + "Overall ValidReservationCheckResult: %s, " + + "Result of hasContainerForNode check: %b, " + + "Result of overAMShareLimit check: %s", + successful, hasContainerForNode, overAMShareLimit); + return new ValidReservationCheckResult(successful, message); + } + + @Override + public String getMessage() { + return message; + } + + @Override + public boolean isSuccessful() { + return successful; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index b1fc2d0bd94..00d3a7b657e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; @@ -84,7 +85,7 @@ public FakeSchedulable(Resource minShare, Resource maxShare, } @Override - public Resource assignContainer(FSSchedulerNode node) { + public ResourceAssignment assignContainer(FSSchedulerNode node) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b4fb8..1b11c729fa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.assignment.ResourceAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -278,7 +279,7 @@ public void updateDemand() { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public ResourceAssignment assignContainer(FSSchedulerNode node) { throw new UnsupportedOperationException(); }