diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 1bc6f239c5ca2c79b471b7f31fcfd370f2547592..6dbf3373d9842f52bacd18bbd9cd0258fd251828 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -978,4 +979,8 @@ protected void decreaseContainer( } + @Override + protected void nodeUpdateInternal(RMNode node, + List completedContainers) { + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8f03de25791eaea72597a9b887a2a6a6e731a8bb..fa85738929c624ebfcc34a791363a09ce2bdfafc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -71,9 +72,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -94,9 +100,14 @@ protected Resource minimumAllocation; protected RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; + protected SchedulerHealth schedulerHealth = new SchedulerHealth(); + private volatile long lastNodeUpdateTime; + + private volatile Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -116,6 +127,7 @@ */ public AbstractYarnScheduler(String name) { super(name); + clock = SystemClock.getInstance(); } @Override @@ -208,6 +220,18 @@ protected void initMaximumResourceCapability(Resource maximumAllocation) { nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + protected void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } + + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + protected synchronized void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { // Get the application for the finished container @@ -781,4 +805,111 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public Clock getClock() { + return clock; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @Lock(Lock.NoLock.class) + public SchedulerNode getNode(NodeId nodeId) { + return nodeTracker.getNode(nodeId); + } + + /** + * Process a heartbeat update from a node. + * @param nm The RMNode corresponding to the NodeManager + */ + protected synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); + } + + Resource releaseResources = Resource.newInstance(0, 0); + SchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers + .addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + // Process completed containers + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + RMContainer container = getRMContainer(containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } + } + } + + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getAllocatedResource(), 0))); + } + + schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), + releaseResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + + nodeUpdateInternal(nm, completedContainers); + + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + + // Now node data structures are up-to-date and ready for scheduling. + if(LOG.isDebugEnabled()) { + LOG.debug("Node being looked for scheduling " + nm + + " availableResource: " + node.getUnallocatedResource()); + } + } + + protected abstract void nodeUpdateInternal(RMNode node, + List completedContainers); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 920e983b0ab60e5da99a734c21a88b0bf8a574dd..248643d5f13e0fdb1711332b53cb692e91a7ede1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -90,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; @@ -230,8 +226,6 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - private SchedulerHealth schedulerHealth = new SchedulerHealth(); - volatile long lastNodeUpdateTime; /** * EXPERT @@ -1046,84 +1040,9 @@ public QueueInfo getQueueInfo(String queueName, return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " clusterResources: " + getClusterResource()); - } - - Resource releaseResources = Resource.newInstance(0, 0); - - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - } - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - // Now node data structures are upto date and ready for scheduling. - if(LOG.isDebugEnabled()) { - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); - } + @Override + protected void nodeUpdateInternal(RMNode node, + List completedContainers) { } /** @@ -1222,7 +1141,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { return; } // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, + updateSchedulerHealth(getLastNodeUpdateTime(), node, new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); CSAssignment assignment; @@ -1260,7 +1179,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().addAllocationDetails( reservedContainer.getContainerId(), queue.getQueuePath()); tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + updateSchedulerHealth(getLastNodeUpdateTime(), node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); } } @@ -1284,7 +1203,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); return; } @@ -1315,7 +1234,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + updateSchedulerHealth(getLastNodeUpdateTime(), node, assignment); } } else { LOG.info("Skipping scheduling since node " @@ -2010,20 +1929,6 @@ private String handleMoveToPlanQueue(String targetQueueName) { } @Override - public SchedulerHealth getSchedulerHealth() { - return this.schedulerHealth; - } - - private void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; - } - - @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 80095807f9420792ad9cd608983d6e89287827b6..a04fc4a9e74066eeaea88392f2e9e7259e465f46 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -180,7 +181,7 @@ public synchronized boolean containerCompleted(RMContainer rmContainer, return true; } - public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, + public synchronized RMContainer allocate(NodeType type, SchedulerNode node, Priority priority, ResourceRequest request, Container container) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c59ba12cbd08efabc70786827f25262ba62e7a52..2850c1ee3f5cf0fe3dbe6b8095ff876f167e7612 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -69,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -91,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -129,7 +124,6 @@ private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -212,7 +206,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -374,7 +367,7 @@ protected synchronized void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -601,15 +594,6 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - - @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; - } - public FairSchedulerEventLog getEventLog() { return eventLog; } @@ -1012,67 +996,27 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } } - - /** - * Process a heartbeat update from a node. - */ - private synchronized void nodeUpdate(RMNode nm) { + + @Override + protected synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " cluster capacity: " + getClusterResource()); - } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } + super.nodeUpdate(nm); + long duration = getClock().getTime() - start; + fsOpDurations.addNodeUpdateDuration(duration); + } + @Override + protected void nodeUpdateInternal(RMNode node, + List completedContainers) { + FSSchedulerNode fsNode = getFSSchedulerNode(node.getNodeID()); if (continuousSchedulingEnabled) { if (!completedContainers.isEmpty()) { - attemptScheduling(node); + attemptScheduling(fsNode); } } else { - attemptScheduling(node); + attemptScheduling(fsNode); } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - long duration = getClock().getTime() - start; - fsOpDurations.addNodeUpdateDuration(duration); } void continuousSchedulingAttempt() throws InterruptedException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3e6225ff88144b3e86ce1a740ebfb70ec74063b3..1c4c8db04328bab6c87aaa75abf81f8d8f7d8813 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,14 +43,12 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -81,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -390,10 +387,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodeTracker.getNode(nodeId); - } - @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { @@ -499,7 +492,7 @@ private synchronized void doneApplicationAttempt( * * @param node node on which resources are available to be allocated */ - private void assignContainers(FiCaSchedulerNode node) { + private void assignContainers(SchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); @@ -559,7 +552,7 @@ private void assignContainers(FiCaSchedulerNode node) { } private int getMaxAllocatableContainers(FiCaSchedulerApp application, - Priority priority, FiCaSchedulerNode node, NodeType type) { + Priority priority, SchedulerNode node, NodeType type) { int maxContainers = 0; ResourceRequest offSwitchRequest = @@ -594,7 +587,7 @@ private int getMaxAllocatableContainers(FiCaSchedulerApp application, } - private int assignContainersOnNode(FiCaSchedulerNode node, + private int assignContainersOnNode(SchedulerNode node, FiCaSchedulerApp application, Priority priority ) { // Data-local @@ -621,7 +614,7 @@ private int assignContainersOnNode(FiCaSchedulerNode node, return (nodeLocalContainers + rackLocalContainers + offSwitchContainers); } - private int assignNodeLocalContainers(FiCaSchedulerNode node, + private int assignNodeLocalContainers(SchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = @@ -647,7 +640,7 @@ private int assignNodeLocalContainers(FiCaSchedulerNode node, return assignedContainers; } - private int assignRackLocalContainers(FiCaSchedulerNode node, + private int assignRackLocalContainers(SchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = @@ -672,7 +665,7 @@ private int assignRackLocalContainers(FiCaSchedulerNode node, return assignedContainers; } - private int assignOffSwitchContainers(FiCaSchedulerNode node, + private int assignOffSwitchContainers(SchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = @@ -685,7 +678,7 @@ private int assignOffSwitchContainers(FiCaSchedulerNode node, return assignedContainers; } - private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, + private int assignContainer(SchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + @@ -732,66 +725,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(rmNode.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getUnallocatedResource()); - - assignContainers(node); - - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getUnallocatedResource()); - } - - updateAvailableResourcesMetrics(); - } - private void increaseUsedResources(RMContainer rmContainer) { Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } @@ -836,7 +769,7 @@ public void handle(SchedulerEvent event) { { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode()); + super.nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: @@ -909,7 +842,7 @@ protected synchronized void completedContainerInternal( container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated - FiCaSchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + @@ -1024,4 +957,27 @@ protected void decreaseContainer( // TODO Auto-generated method stub } + + @Override + protected void nodeUpdateInternal(RMNode rmNode, + List completedContainers) { + SchedulerNode node = getNode(rmNode.getNodeID()); + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { + LOG.debug("Node heartbeat " + rmNode.getNodeID() + + " available resource = " + node.getUnallocatedResource()); + + assignContainers(node); + + LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + + node.getUnallocatedResource()); + } + + updateAvailableResourcesMetrics(); + } }