diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index d5bfc57e66d..aced880b9b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -79,6 +79,7 @@ // Last updated time private volatile long lastHeartbeatMonotonicTime; + protected boolean isStopping = false; public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { @@ -155,8 +156,8 @@ public String getRackName() { * application. * @param rmContainer Allocated container */ - public void allocateContainer(RMContainer rmContainer) { - allocateContainer(rmContainer, false); + public boolean allocateContainer(RMContainer rmContainer) { + return allocateContainer(rmContainer, false); } /** @@ -165,8 +166,15 @@ public void allocateContainer(RMContainer rmContainer) { * @param rmContainer Allocated container * @param launchedOnNode True if the container has been launched */ - protected synchronized void allocateContainer(RMContainer rmContainer, + protected synchronized boolean allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { + if (isStopping) { + LOG.info("Node=" + getNodeID() + + " is stopping by scheduler, skip allocating container=" + + rmContainer.toString()); + return false; + } + Container container = rmContainer.getContainer(); if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { deductUnallocatedResource(container.getResource()); @@ -175,6 +183,16 @@ protected synchronized void allocateContainer(RMContainer rmContainer, launchedContainers.put(container.getId(), new ContainerInfo(rmContainer, launchedOnNode)); + + return true; + } + + /** + * Set the node is stopping, so no new allocation / reservation will be taken. + * This will be useful when scheduler enables async scheduling. + */ + public synchronized void notifyStoppingByScheduler() { + isStopping = true; } /** @@ -300,8 +318,9 @@ public synchronized void deductUnallocatedResource(Resource resource) { * @param attempt Application attempt asking for the reservation. * @param schedulerKey Priority of the reservation. * @param container Container reserving resources for. + * @return resource reserved on the node or not. */ - public abstract void reserveResource(SchedulerApplicationAttempt attempt, + public abstract boolean reserveResource(SchedulerApplicationAttempt attempt, SchedulerRequestKey schedulerKey, RMContainer container); /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 50ab70d03ec..33818703c93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1079,6 +1079,9 @@ private void doneApplicationAttempt( return; } + // Let app knows it is stopping by scheduler + attempt.notifyStoppingByScheduler(); + // Release all the allocated, acquired, running containers for (RMContainer rmContainer : attempt.getLiveContainers()) { if (keepContainers && rmContainer.getState().equals( @@ -1466,7 +1469,8 @@ private CSAssignment allocateContainerOnSingleNode( // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. if (getNode(node.getNodeID()) != node) { - LOG.error("Trying to schedule on a removed node, please double check."); + LOG.error("Trying to schedule on a removed node, please double check, " + + "nodeId=" + node.getNodeID()); return null; } @@ -1480,7 +1484,10 @@ private CSAssignment allocateContainerOnSingleNode( FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( reservedContainer.getContainerId()); if (reservedApplication == null) { - LOG.error("Trying to schedule for a finished app, please double check."); + LOG.error( + "Trying to schedule for a finished app, please double check. nodeId=" + + node.getNodeID() + " container=" + reservedContainer + .getContainerId()); return null; } @@ -1904,6 +1911,9 @@ private void removeNode(RMNode nodeInfo) { return; } + // Let node knows it is stopping by scheduler to block future allocations. + node.notifyStoppingByScheduler(); + // Remove running containers List runningContainers = node.getCopiedListOfRunningContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3ec81915706..cf39bef5148 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -117,6 +117,9 @@ private Map toBeRemovedIncRequests = new ConcurrentHashMap<>(); + // Is it stopping by scheduler? + private boolean isStopping = false; + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { @@ -394,6 +397,13 @@ public boolean accept(Resource cluster, try { readLock.lock(); + if (isStopping) { + if (LOG.isDebugEnabled()) { + LOG.debug("Application=" + getApplicationAttemptId() + + " is stopping by scheduler, skip accept call"); + } + } + // First make sure no container in release list in final state if (anyContainerInFinalState(request)) { return false; @@ -506,6 +516,14 @@ public boolean apply(Resource cluster, ResourceCommitRequest @@ -547,6 +565,18 @@ public boolean apply(Resource cluster, ResourceCommitRequest containers, * @param launchedOnNode True if the container has been launched */ @Override - protected synchronized void allocateContainer(RMContainer rmContainer, + protected synchronized boolean allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { - super.allocateContainer(rmContainer, launchedOnNode); + if (!super.allocateContainer(rmContainer, launchedOnNode)) { + return false; + } + if (LOG.isDebugEnabled()) { final Container container = rmContainer.getContainer(); LOG.debug("Assigned container " + container.getId() + " of capacity " @@ -268,6 +272,8 @@ protected synchronized void allocateContainer(RMContainer rmContainer, } else { LOG.error("Allocated empty container" + rmContainer.getContainerId()); } + + return true; } /**