diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 3066339166cabb08c2b5f5fc240463841e8e14de..01714f0adca50dfb74eb157382f1bf39ddaf374c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -71,9 +72,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -94,9 +100,14 @@ protected Resource minimumAllocation; protected RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; + protected SchedulerHealth schedulerHealth = new SchedulerHealth(); + private volatile long lastNodeUpdateTime; + + private volatile Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -116,6 +127,7 @@ */ public AbstractYarnScheduler(String name) { super(name); + clock = SystemClock.getInstance(); } @Override @@ -208,6 +220,18 @@ protected void initMaximumResourceCapability(Resource maximumAllocation) { nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + protected void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } + + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + protected synchronized void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { // Get the application for the finished container @@ -773,4 +797,106 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public Clock getClock() { + return clock; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @Lock(Lock.NoLock.class) + public SchedulerNode getNode(NodeId nodeId) { + return nodeTracker.getNode(nodeId); + } + + /** + * Process a heartbeat update from a node. + * @param nm The RMNode corresponding to the NodeManager + */ + protected synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); + } + + Resource releasedResources = Resource.newInstance(0, 0); + SchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList<>(); + List completedContainers = + new ArrayList<>(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers + .addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + // Process completed containers + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + RMContainer container = getRMContainer(containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + } + } + + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getAllocatedResource(), 0))); + } + + schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), + releasedResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + + // Now node data structures are up-to-date and ready for scheduling. + if(LOG.isDebugEnabled()) { + LOG.debug("Node being looked for scheduling " + nm + + " availableResource: " + node.getUnallocatedResource()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a701514304a2743063395368d5fc359d8611..8f2d6fec800861ff3e8616f2e25ebe69940f38a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -90,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; @@ -230,8 +226,6 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - private SchedulerHealth schedulerHealth = new SchedulerHealth(); - volatile long lastNodeUpdateTime; /** * EXPERT @@ -1039,86 +1033,15 @@ public QueueInfo getQueueInfo(String queueName, return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " clusterResources: " + getClusterResource()); - } - - Resource releaseResources = Resource.newInstance(0, 0); - - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - } - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - // Now node data structures are upto date and ready for scheduling. - if(LOG.isDebugEnabled()) { - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + @Override + protected synchronized void nodeUpdate(RMNode nm) { + setLastNodeUpdateTime(Time.now()); + super.nodeUpdate(nm); + if (!scheduleAsynchronously) { + allocateContainersToNode(getNode(nm.getNodeID())); } } - + /** * Process resource update on a node. */ @@ -1215,7 +1138,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { return; } // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, + updateSchedulerHealth(getLastNodeUpdateTime(), node, new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); CSAssignment assignment; @@ -1253,7 +1176,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().addAllocationDetails( reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + updateSchedulerHealth(getLastNodeUpdateTime(), node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } } @@ -1277,7 +1200,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); return; } @@ -1308,7 +1231,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); } } else { LOG.info("Skipping scheduling since node " @@ -1360,12 +1283,7 @@ public void handle(SchedulerEvent event) { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - RMNode node = nodeUpdatedEvent.getRMNode(); - setLastNodeUpdateTime(Time.now()); - nodeUpdate(node); - if (!scheduleAsynchronously) { - allocateContainersToNode(getNode(node.getNodeID())); - } + nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: @@ -2003,20 +1921,6 @@ private String handleMoveToPlanQueue(String targetQueueName) { } @Override - public SchedulerHealth getSchedulerHealth() { - return this.schedulerHealth; - } - - private void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; - } - - @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba37379b64291c664a32d59fe95107a0de0..f6a500b9efc826d7472a6f3f87f4c77fefe6d3ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -69,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -91,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -129,7 +124,6 @@ private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -214,7 +208,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -376,7 +369,7 @@ protected synchronized void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -603,15 +596,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - - @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; - } - public FairSchedulerEventLog getEventLog() { return eventLog; } @@ -1008,64 +992,15 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } } - - /** - * Process a heartbeat update from a node. - */ - private synchronized void nodeUpdate(RMNode nm) { + + @Override + protected synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " cluster capacity: " + getClusterResource()); - } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { - attemptScheduling(node); - } - } else { - attemptScheduling(node); - } + super.nodeUpdate(nm); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + attemptScheduling(fsNode); long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index eaab495a0d01d5e37d0925b74d7e7910b54913ca..7d46e74ef2446e1730670cc4e9734ac9d781be39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,14 +43,12 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -384,10 +380,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodeTracker.getNode(nodeId); - } - @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { @@ -727,66 +719,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(rmNode.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getUnallocatedResource()); - - assignContainers(node); - - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getUnallocatedResource()); - } - - updateAvailableResourcesMetrics(); - } - private void increaseUsedResources(RMContainer rmContainer) { Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } @@ -904,7 +836,7 @@ protected synchronized void completedContainerInternal( container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated - FiCaSchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + @@ -1019,4 +951,28 @@ protected void decreaseContainer( // TODO Auto-generated method stub } + + @Override + protected synchronized void nodeUpdate(RMNode nm) { + super.nodeUpdate(nm); + + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID()); + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { + LOG.debug("Node heartbeat " + nm.getNodeID() + + " available resource = " + node.getUnallocatedResource()); + + assignContainers(node); + + LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + + node.getUnallocatedResource()); + } + + updateAvailableResourcesMetrics(); + } }