diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64eb7773a024baceb1c0d3240077077db6fdac5c..d0a07b54a5d10f5406abf2d16cf6b37a1b21d2f7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -71,9 +72,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -94,9 +100,14 @@ protected Resource minimumAllocation; protected RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; + protected SchedulerHealth schedulerHealth = new SchedulerHealth(); + private volatile long lastNodeUpdateTime; + + private volatile Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -116,6 +127,7 @@ */ public AbstractYarnScheduler(String name) { super(name); + clock = SystemClock.getInstance(); } @Override @@ -208,6 +220,18 @@ protected void initMaximumResourceCapability(Resource maximumAllocation) { nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + protected void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } + + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + protected synchronized void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { // Get the application for the finished container @@ -224,7 +248,7 @@ protected synchronized void containerLaunchedOnNode( application.containerLaunchedOnNode(containerId, node.getNodeID()); } - + protected void containerIncreasedOnNode(ContainerId containerId, SchedulerNode node, Container increasedContainerReportedByNM) { // Get the application for the finished container @@ -398,7 +422,7 @@ public synchronized void recoverContainersOnNode( // recover scheduler attempt schedulerAttempt.recoverContainer(schedulerNode, rmContainer); - + // set master container for the current running AMContainer for this // attempt. RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); @@ -444,7 +468,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, } /** - * Recover resource request back from RMContainer when a container is + * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. * @param rmContainer rmContainer @@ -594,7 +618,7 @@ protected abstract void decreaseContainer( SchedulerApplicationAttempt attempt); @Override - public SchedulerNode getSchedulerNode(NodeId nodeId) { + public N getSchedulerNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); } @@ -789,4 +813,106 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public Clock getClock() { + return clock; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @Lock(Lock.NoLock.class) + public SchedulerNode getNode(NodeId nodeId) { + return nodeTracker.getNode(nodeId); + } + + /** + * Process a heartbeat update from a node. + * @param nm The RMNode corresponding to the NodeManager + */ + protected synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); + } + + Resource releasedResources = Resource.newInstance(0, 0); + SchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList<>(); + List completedContainers = + new ArrayList<>(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers + .addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + // Process completed containers + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + RMContainer container = getRMContainer(containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releasedResources, rs); + } + } + } + + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getAllocatedResource(), 0))); + } + + schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), + releasedResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + + // Now node data structures are up-to-date and ready for scheduling. + if(LOG.isDebugEnabled()) { + LOG.debug("Node being looked for scheduling " + nm + + " availableResource: " + node.getUnallocatedResource()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a701514304a2743063395368d5fc359d8611..8f2d6fec800861ff3e8616f2e25ebe69940f38a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -90,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; @@ -230,8 +226,6 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - private SchedulerHealth schedulerHealth = new SchedulerHealth(); - volatile long lastNodeUpdateTime; /** * EXPERT @@ -1039,86 +1033,15 @@ public QueueInfo getQueueInfo(String queueName, return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " clusterResources: " + getClusterResource()); - } - - Resource releaseResources = Resource.newInstance(0, 0); - - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - } - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - // Now node data structures are upto date and ready for scheduling. - if(LOG.isDebugEnabled()) { - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + @Override + protected synchronized void nodeUpdate(RMNode nm) { + setLastNodeUpdateTime(Time.now()); + super.nodeUpdate(nm); + if (!scheduleAsynchronously) { + allocateContainersToNode(getNode(nm.getNodeID())); } } - + /** * Process resource update on a node. */ @@ -1215,7 +1138,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { return; } // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, + updateSchedulerHealth(getLastNodeUpdateTime(), node, new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); CSAssignment assignment; @@ -1253,7 +1176,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().addAllocationDetails( reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + updateSchedulerHealth(getLastNodeUpdateTime(), node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } } @@ -1277,7 +1200,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); return; } @@ -1308,7 +1231,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); } } else { LOG.info("Skipping scheduling since node " @@ -1360,12 +1283,7 @@ public void handle(SchedulerEvent event) { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - RMNode node = nodeUpdatedEvent.getRMNode(); - setLastNodeUpdateTime(Time.now()); - nodeUpdate(node); - if (!scheduleAsynchronously) { - allocateContainersToNode(getNode(node.getNodeID())); - } + nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: @@ -2003,20 +1921,6 @@ private String handleMoveToPlanQueue(String targetQueueName) { } @Override - public SchedulerHealth getSchedulerHealth() { - return this.schedulerHealth; - } - - private void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; - } - - @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba37379b64291c664a32d59fe95107a0de0..f6a500b9efc826d7472a6f3f87f4c77fefe6d3ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -69,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -91,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -129,7 +124,6 @@ private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -214,7 +208,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -376,7 +369,7 @@ protected synchronized void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -603,15 +596,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - - @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; - } - public FairSchedulerEventLog getEventLog() { return eventLog; } @@ -1008,64 +992,15 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } } - - /** - * Process a heartbeat update from a node. - */ - private synchronized void nodeUpdate(RMNode nm) { + + @Override + protected synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " cluster capacity: " + getClusterResource()); - } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { - attemptScheduling(node); - } - } else { - attemptScheduling(node); - } + super.nodeUpdate(nm); - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + attemptScheduling(fsNode); long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index eaab495a0d01d5e37d0925b74d7e7910b54913ca..7d46e74ef2446e1730670cc4e9734ac9d781be39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,14 +43,12 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -384,10 +380,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodeTracker.getNode(nodeId); - } - @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { @@ -727,66 +719,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(rmNode.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getUnallocatedResource()); - - assignContainers(node); - - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getUnallocatedResource()); - } - - updateAvailableResourcesMetrics(); - } - private void increaseUsedResources(RMContainer rmContainer) { Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } @@ -904,7 +836,7 @@ protected synchronized void completedContainerInternal( container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated - FiCaSchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + @@ -1019,4 +951,28 @@ protected void decreaseContainer( // TODO Auto-generated method stub } + + @Override + protected synchronized void nodeUpdate(RMNode nm) { + super.nodeUpdate(nm); + + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID()); + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { + LOG.debug("Node heartbeat " + nm.getNodeID() + + " available resource = " + node.getUnallocatedResource()); + + assignContainers(node); + + LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + + node.getUnallocatedResource()); + } + + updateAvailableResourcesMetrics(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index e3ef8c28fd4c6382ec09b9cdbd0218ca675b50ac..7b243d64732731827e9c342239945745c239bb4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -1059,7 +1059,7 @@ private void setResourceAndNodeDetails() { when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( clusterResources); - SchedulerNode mNode = mock(SchedulerNode.class); + FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class); when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); }