diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8f03de25791eaea72597a9b887a2a6a6e731a8bb..9647634cd4e43e7824019a16d423eb0d05d65291 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -71,9 +72,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -94,9 +100,14 @@ protected Resource minimumAllocation; protected RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; + protected SchedulerHealth schedulerHealth = new SchedulerHealth(); + private volatile long lastNodeUpdateTime; + + private volatile Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -116,6 +127,7 @@ */ public AbstractYarnScheduler(String name) { super(name); + clock = SystemClock.getInstance(); } @Override @@ -208,6 +220,18 @@ protected void initMaximumResourceCapability(Resource maximumAllocation) { nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + protected void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } + + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + protected synchronized void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { // Get the application for the finished container @@ -781,4 +805,106 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public Clock getClock() { + return clock; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @Lock(Lock.NoLock.class) + public SchedulerNode getNode(NodeId nodeId) { + return nodeTracker.getNode(nodeId); + } + + /** + * Process a heartbeat update from a node. + * @param nm The RMNode corresponding to the NodeManager + */ + protected synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); + } + + Resource releasedResources = Resource.newInstance(0, 0); + SchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList<>(); + List completedContainers = + new ArrayList<>(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers + .addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + // Process completed containers + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + RMContainer container = getRMContainer(containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + } + } + + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getAllocatedResource(), 0))); + } + + schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), + releasedResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + + // Now node data structures are up-to-date and ready for scheduling. + if(LOG.isDebugEnabled()) { + LOG.debug("Node being looked for scheduling " + nm + + " availableResource: " + node.getUnallocatedResource()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 920e983b0ab60e5da99a734c21a88b0bf8a574dd..ec41c375389c2f2fc517c06d827e0941797e6f19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -90,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; @@ -230,8 +226,6 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - private SchedulerHealth schedulerHealth = new SchedulerHealth(); - volatile long lastNodeUpdateTime; /** * EXPERT @@ -1046,86 +1040,6 @@ public QueueInfo getQueueInfo(String queueName, return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " clusterResources: " + getClusterResource()); - } - - Resource releaseResources = Resource.newInstance(0, 0); - - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - } - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - // Now node data structures are upto date and ready for scheduling. - if(LOG.isDebugEnabled()) { - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); - } - } - /** * Process resource update on a node. */ @@ -1222,7 +1136,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { return; } // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, + updateSchedulerHealth(getLastNodeUpdateTime(), node, new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); CSAssignment assignment; @@ -1260,7 +1174,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().addAllocationDetails( reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + updateSchedulerHealth(getLastNodeUpdateTime(), node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } } @@ -1284,7 +1198,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); return; } @@ -1315,7 +1229,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); } } else { LOG.info("Skipping scheduling since node " @@ -2010,20 +1924,6 @@ private String handleMoveToPlanQueue(String targetQueueName) { } @Override - public SchedulerHealth getSchedulerHealth() { - return this.schedulerHealth; - } - - private void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; - } - - @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c59ba12cbd08efabc70786827f25262ba62e7a52..4fbd76d76d42a5e51216fe31eb2b567849abda1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -69,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -91,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -129,7 +124,6 @@ private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -212,7 +206,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -374,7 +367,7 @@ protected synchronized void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -601,15 +594,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - - @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; - } - public FairSchedulerEventLog getEventLog() { return eventLog; } @@ -1012,64 +996,15 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } } - - /** - * Process a heartbeat update from a node. - */ - private synchronized void nodeUpdate(RMNode nm) { + + @Override + protected synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " cluster capacity: " + getClusterResource()); - } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { - attemptScheduling(node); - } - } else { - attemptScheduling(node); - } + super.nodeUpdate(nm); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + attemptScheduling(fsNode); long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3e6225ff88144b3e86ce1a740ebfb70ec74063b3..23416f707989097e9484fb29128fd5d3efe628ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,14 +43,12 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -390,10 +386,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodeTracker.getNode(nodeId); - } - @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { @@ -732,66 +724,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(rmNode.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getUnallocatedResource()); - - assignContainers(node); - - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getUnallocatedResource()); - } - - updateAvailableResourcesMetrics(); - } - private void increaseUsedResources(RMContainer rmContainer) { Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } @@ -909,7 +841,7 @@ protected synchronized void completedContainerInternal( container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated - FiCaSchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + @@ -1024,4 +956,28 @@ protected void decreaseContainer( // TODO Auto-generated method stub } + + @Override + protected synchronized void nodeUpdate(RMNode nm) { + super.nodeUpdate(nm); + + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID()); + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { + LOG.debug("Node heartbeat " + nm.getNodeID() + + " available resource = " + node.getUnallocatedResource()); + + assignContainers(node); + + LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + + node.getUnallocatedResource()); + } + + updateAvailableResourcesMetrics(); + } }