diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 817565b..53a37b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -680,6 +680,7 @@ public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); + SchedulerMetrics.getMetrics().handle(event, SchedulerMetrics.SchedulerEventOp.HANDLED); } catch (InterruptedException e) { LOG.error("Returning, interrupted : " + e); return; // TODO: Kill RM. @@ -731,6 +732,8 @@ public void handle(SchedulerEvent event) { LOG.info("Very low remaining capacity on scheduler event queue: " + remCapacity); } + // Order is important. Update counter first, then put it to the queue. + SchedulerMetrics.getMetrics().handle(event, SchedulerMetrics.SchedulerEventOp.ADDED); this.eventQueue.put(event); } catch (InterruptedException e) { LOG.info("Interrupted. Trying to exit gracefully."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java old mode 100644 new mode 100755 index d69600a..776f41f --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -98,7 +100,9 @@ private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; - + + private volatile Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -121,6 +125,15 @@ public AbstractYarnScheduler(String name) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.maxAllocReadLock = lock.readLock(); this.maxAllocWriteLock = lock.writeLock(); + clock = new SystemClock(); + } + + public Clock getClock() { + return clock; + } + + protected void setClock(Clock clock) { + this.clock = clock; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java new file mode 100755 index 0000000..ffadbb7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerMetrics.java @@ -0,0 +1,496 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.lib.MutableStat; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class SchedulerMetrics { + + private static final AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("SchedulerMetrics", "Metrics for the Yarn Scheduler"); + private static volatile SchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + private static YarnScheduler scheduler; + private static QueueMetrics rootMetric; + + private final Timer timer= new Timer("Scheduler metrics updater", true); + private static final long TIMER_START_DELAY_MS = 5000L; + + public enum SchedulerLoad { + LIGHT, NORMAL, BUSY, HEAVY, UNKNOWN + } + private static SchedulerLoad schedulerLoad; + + private static final long UPDATE_INTERVAL = 1000L; + private static final long LOOK_BACK_PERIOD = 10000L; + private static final TimeUnit UNIT = TimeUnit.MILLISECONDS; + + public enum SchedulerEventOp { + ADDED, HANDLED + } + + @Metric("# of waiting scheduler events") MutableGaugeInt numWaitingEvents; + @Metric("# of waiting node_add events") MutableGaugeInt numWaitingNodeAddEvents; + @Metric("# of waiting node_remove events") MutableGaugeInt numWaitingNodeRemoveEvents; + @Metric("# of waiting node_update events") MutableGaugeInt numWaitingNodeUpdateEvents; + @Metric("# of waiting node_resource_update events") MutableGaugeInt numWaitingNodeResourceUpdateEvents; + @Metric("# of waiting node_labels_update events") MutableGaugeInt numWaitingNodeLabelsUpdateEvents; + @Metric("# of waiting app_add events") MutableGaugeInt numWaitingAppAddEvents; + @Metric("# of waiting app_remove events") MutableGaugeInt numWaitingAppRemoveEvents; + @Metric("# of waiting attempt_add events") MutableGaugeInt numWaitingAttemptAddEvents; + @Metric("# of waiting attempt_remove events") MutableGaugeInt numWaitingAttemptRemoveEvents; + @Metric("# of waiting container_expired events") MutableGaugeInt numWaitingContainerExpiredEvents; + @Metric("# of waiting drop_reservation events") MutableGaugeInt numWaitingDropReservationEvents; + @Metric("# of waiting preempt_container events") MutableGaugeInt numWaitingPreemptContainerEvents; + @Metric("# of waiting kill_container events") MutableGaugeInt numWaitingKillContainerEvents; + + @Metric("Stat of waiting scheduler events") MutableStat numWaitingEventsStat; + @Metric("Stat of waiting node_update events") MutableStat numWaitingNodeUpdateEventsStat; + + @Metric("Rate of scheduler events handled") MutableRate eventsHandlingRate; + @Metric("Rate of node_update events handled") MutableRate nodeUpdateHandlingRate; + + @Metric("Rate of scheduler events added") MutableRate eventsAddingRate; + @Metric("Rate of node update events added") MutableRate nodeUpdateAddingRate; + + @Metric("Rate of container allocation") MutableRate containerAllocationRate; + // schedulingExecRate equals nodeUpdateHandlingRate + // if asynchronously scheduling (continuous scheduling in fair) is disabled. + @Metric("Rate that scheduling be executed") MutableRate schedulingExecRate; + @Metric("Latency of app allocate") MutableStat appAllocateDurationStat; + @Metric("Latency of node update") MutableStat nodeUpdateDurationStat; + @Metric("Duration of scheduling call") MutableStat schedulingDurationStat; + + // Stat of scheduling duration accumulation within a second, which can reflect the scheduler load. + @Metric("Stat of scheduling duration accumulation within a second") MutableRate schedulingAccumulationStat; + // Records the scheduling duration accumulation with a second. + private AtomicLong schedulingAccumulation = new AtomicLong(0); + // Counter that counts the times of scheduling method be called. + private AtomicInteger schedulingExecCounter = new AtomicInteger(0); + + /** + * The elements of the counter are for + * 0: sum of all events + * 1: NODE_ADDED + * 2: NODE_REMOVED + * 3: NODE_UPDATE + * 4: NODE_RESOURCE_UPDATE + * 5: NODE_LABELS_UPDATE + * 6: APP_ADDED + * 7: APP_REMOVED + * 8: APP_ATTEMPT_ADDED + * 9: APP_ATTEMPT_REMOVED + * 10: CONTAINER_EXPIRED + * 11: DROP_RESERVATION + * 12: PREEMPT_CONTAINER + * 13: KILL_CONTAINER + */ + private static final int NUM_SCHEDULER_EVENTS_TYPES = SchedulerEventType.values().length; + private final AtomicInteger[] eventsAddingCounter = new AtomicInteger[NUM_SCHEDULER_EVENTS_TYPES]; + private final AtomicInteger[] eventsTakingCounter = new AtomicInteger[NUM_SCHEDULER_EVENTS_TYPES]; + + + // SchedulerMetrics should be a singleton. + private SchedulerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("SchedulerMetrics", "Metrics for the Yarn Scheduler", + this); + } + rootMetric = scheduler.getRootQueueMetrics(); + schedulerLoad = SchedulerLoad.NORMAL; + for (int i = 0; i < NUM_SCHEDULER_EVENTS_TYPES; i++) { + eventsAddingCounter[i] = new AtomicInteger(0); + eventsTakingCounter[i] = new AtomicInteger(0); + } + startTimerTask(); + } + + private static final Log LOG = LogFactory.getLog(SchedulerMetrics.class); + + public synchronized static void initMetrics(YarnScheduler sched) { + if (!isInitialized.get()) { + scheduler = sched; + if(INSTANCE == null){ + INSTANCE = new SchedulerMetrics(); + INSTANCE.enableMovingAvg(); + isInitialized.set(true); + } + LOG.info("SchedulerMetrics initialized."); + } else { + LOG.info("SchedulerMetrics has already been initialized."); + } + } + + public synchronized static void reInitMetrics(YarnScheduler sched) { + destroyMetrics(); + initMetrics(sched); + LOG.info("SchedulerMetrics reinitialized."); + } + + public synchronized static void destroyMetrics() { + isInitialized.set(false); + scheduler = null; + INSTANCE.stopTimerTask(); + INSTANCE = null; + } + + public static SchedulerMetrics getMetrics() { + if(!isInitialized.get()) { + LOG.error("SchedulerMetrics hasn't been initialized, please initialize it first."); + } + return INSTANCE; + } + + private synchronized void enableMovingAvg() { + numWaitingEventsStat.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + numWaitingNodeUpdateEventsStat.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + + eventsHandlingRate.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + nodeUpdateHandlingRate.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + + eventsAddingRate.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + nodeUpdateAddingRate.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + + containerAllocationRate.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + appAllocateDurationStat.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + nodeUpdateDurationStat.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + schedulingDurationStat.enableMovingAvg(UPDATE_INTERVAL, LOOK_BACK_PERIOD, UNIT); + } + + public void handle(SchedulerEvent event, SchedulerEventOp opt) { + switch (opt) { + case ADDED: + switch (event.getType()) { + case NODE_ADDED: + eventsAddingCounter[1].incrementAndGet(); + numWaitingNodeAddEvents.incr(); + break; + case NODE_REMOVED: + eventsAddingCounter[2].incrementAndGet(); + numWaitingNodeRemoveEvents.incr(); + break; + case NODE_UPDATE: + eventsAddingCounter[3].incrementAndGet(); + numWaitingNodeUpdateEvents.incr(); + break; + case NODE_RESOURCE_UPDATE: + eventsAddingCounter[4].incrementAndGet(); + numWaitingNodeResourceUpdateEvents.incr(); + break; + case NODE_LABELS_UPDATE: + eventsAddingCounter[5].incrementAndGet(); + numWaitingNodeLabelsUpdateEvents.incr(); + break; + case APP_ADDED: + eventsAddingCounter[6].incrementAndGet(); + numWaitingAppAddEvents.incr(); + break; + case APP_REMOVED: + eventsAddingCounter[7].incrementAndGet(); + numWaitingAppRemoveEvents.incr(); + break; + case APP_ATTEMPT_ADDED: + eventsAddingCounter[8].incrementAndGet(); + numWaitingAttemptAddEvents.incr(); + break; + case APP_ATTEMPT_REMOVED: + eventsAddingCounter[9].incrementAndGet(); + numWaitingAttemptRemoveEvents.incr(); + break; + case CONTAINER_EXPIRED: + eventsAddingCounter[10].incrementAndGet(); + numWaitingContainerExpiredEvents.incr(); + break; + case DROP_RESERVATION: + eventsAddingCounter[11].incrementAndGet(); + numWaitingDropReservationEvents.incr(); + break; + case PREEMPT_CONTAINER: + eventsAddingCounter[12].incrementAndGet(); + numWaitingPreemptContainerEvents.incr(); + break; + case KILL_CONTAINER: + eventsAddingCounter[13].incrementAndGet(); + numWaitingKillContainerEvents.incr(); + break; + default: + LOG.error("Unknown scheduler event type: " + event.getType()); + } + eventsAddingCounter[0].incrementAndGet(); + numWaitingEvents.incr(); + break; + case HANDLED: + switch (event.getType()) { + case NODE_ADDED: + eventsTakingCounter[1].incrementAndGet(); + numWaitingNodeAddEvents.decr(); + break; + case NODE_REMOVED: + eventsTakingCounter[2].incrementAndGet(); + numWaitingNodeRemoveEvents.decr(); + break; + case NODE_UPDATE: + eventsTakingCounter[3].incrementAndGet(); + numWaitingNodeUpdateEvents.decr(); + break; + case NODE_RESOURCE_UPDATE: + eventsTakingCounter[4].incrementAndGet(); + numWaitingNodeResourceUpdateEvents.decr(); + break; + case NODE_LABELS_UPDATE: + eventsTakingCounter[5].incrementAndGet(); + numWaitingNodeLabelsUpdateEvents.decr(); + break; + case APP_ADDED: + eventsTakingCounter[6].incrementAndGet(); + numWaitingAppAddEvents.decr(); + break; + case APP_REMOVED: + eventsTakingCounter[7].incrementAndGet(); + numWaitingAppRemoveEvents.decr(); + break; + case APP_ATTEMPT_ADDED: + eventsTakingCounter[8].incrementAndGet(); + numWaitingAttemptAddEvents.decr(); + break; + case APP_ATTEMPT_REMOVED: + eventsTakingCounter[9].incrementAndGet(); + numWaitingAttemptRemoveEvents.decr(); + break; + case CONTAINER_EXPIRED: + eventsTakingCounter[10].incrementAndGet(); + numWaitingContainerExpiredEvents.decr(); + break; + case DROP_RESERVATION: + eventsTakingCounter[11].incrementAndGet(); + numWaitingDropReservationEvents.decr(); + break; + case PREEMPT_CONTAINER: + eventsTakingCounter[12].incrementAndGet(); + numWaitingPreemptContainerEvents.decr(); + break; + case KILL_CONTAINER: + eventsTakingCounter[13].incrementAndGet(); + numWaitingKillContainerEvents.decr(); + break; + default: + LOG.error("Unknown scheduler event type: " + event.getType()); + } + eventsTakingCounter[0].incrementAndGet(); + numWaitingEvents.decr(); + break; + default: + LOG.error("Unknown scheduler event options: " + opt.toString()); + } + } + + private void resetAllCounters() { + for (int i = 0; i < NUM_SCHEDULER_EVENTS_TYPES; i++) { + eventsAddingCounter[i].set(0); + eventsTakingCounter[i].set(0); + } + } + + private void startTimerTask() { + TimerTask metricsUpdateTask = new TimerTask() { + long lastRecord = rootMetric.getAggregateAllocatedContainers(); + @Override + public void run() { + numWaitingEventsStat.add(numWaitingEvents.value()); + numWaitingNodeUpdateEventsStat.add(numWaitingNodeUpdateEvents.value()); + + eventsHandlingRate.add(eventsTakingCounter[0].getAndSet(0)); + nodeUpdateHandlingRate.add(eventsTakingCounter[3].getAndSet(0)); + + eventsAddingRate.add(eventsAddingCounter[0].getAndSet(0)); + nodeUpdateAddingRate.add(eventsAddingCounter[3].getAndSet(0)); + + containerAllocationRate.add(rootMetric.getAggregateAllocatedContainers() - lastRecord); + lastRecord = rootMetric.getAggregateAllocatedContainers(); + + schedulingAccumulationStat.add(schedulingAccumulation.getAndSet(0)); + schedulingExecRate.add(schedulingExecCounter.getAndSet(0)); + + evaluateLoad(); + } + }; + timer.scheduleAtFixedRate(metricsUpdateTask, + TIMER_START_DELAY_MS, getMetricUpdateIntervalMills()); + } + + private void stopTimerTask() { + timer.cancel(); + } + + public void evaluateLoad() { + // If there hasn't been stat data or num of samples is inadequate to evaluate the scheduler load + if (getNumWaitingEventsStat() == null || getSchedulingAccumulationStat() == null || + getSchedulingAccumulationStat().numSamples() < 100) { + schedulerLoad = SchedulerLoad.UNKNOWN; + return; + } + + // We use some statistic info to evaluate the load of the scheduler. + final double curNumWaitingEvents = getNumWaitingEventsStat().movingAvg(); + final double curSchedulingAccumulation = getSchedulingAccumulationStat().movingAvg(); + final double schedulingCPURatio = curSchedulingAccumulation / getMetricUpdateIntervalMills(); + + // For NORMAL and LIGHT scheduler, there should not be scheduler events + // accumulation in the dispatch queue at the very most time. + final int numActiveNodes = scheduler.getNumClusterNodes(); + if (curNumWaitingEvents / numActiveNodes < 0.01) { + if (schedulingCPURatio < 0.1) { + schedulerLoad = SchedulerLoad.LIGHT; + } else if (schedulingCPURatio < 0.6) { + schedulerLoad = SchedulerLoad.NORMAL; + } else { + schedulerLoad = SchedulerLoad.BUSY; + } + } else if (curNumWaitingEvents / numActiveNodes < 0.1) { + schedulerLoad = SchedulerLoad.BUSY; + } else { + schedulerLoad = SchedulerLoad.HEAVY; + } + } + + // Waiting events info in scheduler dispatch queue + public int getNumWaitingEvents() { + return numWaitingEvents.value(); + } + + public int getNumWaitingNodeUpdateEvents() { + return numWaitingNodeUpdateEvents.value(); + } + + // Stat of waiting events + public MutableRate.StatInfo getNumWaitingEventsStat() { + return numWaitingEventsStat.getHistStatInfo(); + } + + public MutableRate.StatInfo getNumWaitingNodeUpdateEventsStat() { + return numWaitingNodeUpdateEventsStat.getHistStatInfo(); + } + + // Scheduler events handling + public MutableRate.StatInfo getEventsHandlingStat() { + return eventsHandlingRate.getHistStatInfo(); + } + + // Scheduler events adding + public MutableRate.StatInfo getEventsAddingStat() { + return eventsAddingRate.getHistStatInfo(); + } + + // Node Update events handling + public MutableRate.StatInfo getNodeUpdateHandlingStat() { + return nodeUpdateHandlingRate.getHistStatInfo(); + } + + // Node update events adding + public MutableRate.StatInfo getNodeUpdateAddingStat() { + return nodeUpdateAddingRate.getHistStatInfo(); + } + + // Container allocation + public void updateContainerAllocationRate(long rate) { + containerAllocationRate.add(rate); + } + + public MutableRate.StatInfo getContainerAllocationStat() { + return containerAllocationRate.getHistStatInfo(); + } + + // App allocate duration + public void updateAppAllocateDurationStat(long duration) { + appAllocateDurationStat.add(duration); + } + + public MutableRate.StatInfo getAppAllocateDurationStat() { + return appAllocateDurationStat.getHistStatInfo(); + } + + // Node update duration + public void updateNodeUpdateDurationStat(long duration) { + nodeUpdateDurationStat.add(duration); + } + + public MutableRate.StatInfo getNodeUpdateDurationStat() { + return nodeUpdateDurationStat.getHistStatInfo(); + } + + // Node update duration integral + public MutableRate.StatInfo getSchedulingAccumulationStat() { + return schedulingAccumulationStat.getHistStatInfo(); + } + + // Scheduling call duration + public void updateSchedulingDurationStat(long duration) { + schedulingDurationStat.add(duration); + schedulingAccumulation.addAndGet(duration); + schedulingExecCounter.incrementAndGet(); + } + + public MutableRate.StatInfo getSchedulingDurationStat() { + return schedulingDurationStat.getHistStatInfo(); + } + + public MutableRate.StatInfo getSchedulingExecRate() { + return schedulingExecRate.getHistStatInfo(); + } + + // Scheduler load info + public SchedulerLoad getSchedulerLoad() { + return schedulerLoad; + } + + public long getMetricUpdateIntervalMills() { + return UNIT.toMillis(UPDATE_INTERVAL); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java old mode 100644 new mode 100755 index 1d353a6..e2bd607 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -46,7 +46,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -97,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; @@ -356,6 +356,7 @@ public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); super.serviceInit(conf); initScheduler(configuration); + SchedulerMetrics.initMetrics(this); } @Override @@ -896,63 +897,66 @@ private synchronized void doneApplicationAttempt( public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals) { - - FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); - if (application == null) { - return EMPTY_ALLOCATION; - } - - // Sanity check - SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), getMaximumResourceCapability()); - - // Release containers - releaseContainers(release, application); - - Allocation allocation; - - LeafQueue updateDemandForQueue = null; - - synchronized (application) { - - // make sure we aren't stopping/removing the application - // when the allocate comes in - if (application.isStopped()) { + long start = getClock().getTime(); + try { + FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); + if (application == null) { return EMPTY_ALLOCATION; } - - if (!ask.isEmpty()) { - - if(LOG.isDebugEnabled()) { - LOG.debug("allocate: pre-update " + applicationAttemptId + - " ask size =" + ask.size()); - application.showRequests(); - } - - // Update application requests - if (application.updateResourceRequests(ask)) { - updateDemandForQueue = (LeafQueue) application.getQueue(); + + // Sanity check + SchedulerUtils.normalizeRequests( + ask, getResourceCalculator(), getClusterResource(), + getMinimumResourceCapability(), getMaximumResourceCapability()); + + // Release containers + releaseContainers(release, application); + + Allocation allocation; + + LeafQueue updateDemandForQueue = null; + + synchronized (application) { + + // make sure we aren't stopping/removing the application + // when the allocate comes in + if (application.isStopped()) { + return EMPTY_ALLOCATION; } - - if(LOG.isDebugEnabled()) { - LOG.debug("allocate: post-update"); - application.showRequests(); + + if (!ask.isEmpty()) { + + if(LOG.isDebugEnabled()) { + LOG.debug("allocate: pre-update " + applicationAttemptId + + " ask size =" + ask.size()); + application.showRequests(); + } + + // Update application requests + if (application.updateResourceRequests(ask)) { + updateDemandForQueue = (LeafQueue) application.getQueue(); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("allocate: post-update"); + application.showRequests(); + } } + + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + + allocation = application.getAllocation(getResourceCalculator(), + clusterResource, getMinimumResourceCapability()); } - - application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - allocation = application.getAllocation(getResourceCalculator(), - clusterResource, getMinimumResourceCapability()); - } - - if (updateDemandForQueue != null) { - updateDemandForQueue.getOrderingPolicy().demandUpdated(application); + + if (updateDemandForQueue != null) { + updateDemandForQueue.getOrderingPolicy().demandUpdated(application); + } + + return allocation; + } finally { + SchedulerMetrics.getMetrics().updateAppAllocateDurationStat(getClock().getTime() - start); } - - return allocation; - } @Override @@ -1122,33 +1126,66 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node, } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - // reset allocation and reservation stats before we start doing any work - updateSchedulerHealth(lastNodeUpdateTime, node, - new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); - - CSAssignment assignment; - - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations - - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp reservedApplication = - getCurrentAttemptForContainer(reservedContainer.getContainerId()); - - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " - + reservedApplication.getApplicationId() + " on node: " - + node.getNodeID()); - - LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); - assignment = - queue.assignContainers( + long start = getClock().getTime(); + try { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + // reset allocation and reservation stats before we start doing any work + updateSchedulerHealth(lastNodeUpdateTime, node, + new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); + + CSAssignment assignment; + + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations + + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp reservedApplication = + getCurrentAttemptForContainer(reservedContainer.getContainerId()); + + // Try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedApplication.getApplicationId() + " on node: " + + node.getNodeID()); + + LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); + assignment = + queue.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource)), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + if (assignment.isFulfilledReservation()) { + CSAssignment tmp = + new CSAssignment(reservedContainer.getReservedResource(), + assignment.getType()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + reservedContainer.getReservedResource()); + tmp.getAssignmentInformation().addAllocationDetails( + reservedContainer.getContainerId(), queue.getQueuePath()); + tmp.getAssignmentInformation().incrAllocations(); + updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + } + } + + // Try to schedule more if there are no reservations to fulfill + if (node.getReservedContainer() == null) { + if (calculator.computeAvailableContainers(node.getAvailableResource(), + minimumAllocation) > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to schedule on node: " + node.getNodeName() + + ", available: " + node.getAvailableResource()); + } + + assignment = root.assignContainers( clusterResource, node, // TODO, now we only consider limits for parent for non-labeled @@ -1156,78 +1193,50 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (assignment.isFulfilledReservation()) { - CSAssignment tmp = - new CSAssignment(reservedContainer.getReservedResource(), - assignment.getType()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - reservedContainer.getReservedResource()); - tmp.getAssignmentInformation().addAllocationDetails( - reservedContainer.getContainerId(), queue.getQueuePath()); - tmp.getAssignmentInformation().incrAllocations(); - updateSchedulerHealth(lastNodeUpdateTime, node, tmp); - schedulerHealth.updateSchedulerFulfilledReservationCounts(1); - } - } - - // Try to schedule more if there are no reservations to fulfill - if (node.getReservedContainer() == null) { - if (calculator.computeAvailableContainers(node.getAvailableResource(), - minimumAllocation) > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to schedule on node: " + node.getNodeName() + - ", available: " + node.getAvailableResource()); - } - - assignment = root.assignContainers( - clusterResource, - node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (Resources.greaterThan(calculator, clusterResource, - assignment.getResource(), Resources.none())) { - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); - return; - } - - // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(node.getPartition(), - RMNodeLabelsManager.NO_LABEL)) { - return; - } - - // Only do non-exclusive allocation when the node-label supports that - try { - if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - node.getPartition())) { + if (Resources.greaterThan(calculator, clusterResource, + assignment.getResource(), Resources.none())) { + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + return; + } + + // Only do non-exclusive allocation when node has node-labels. + if (StringUtils.equals(node.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { return; } - } catch (IOException e) { - LOG.warn("Exception when trying to get exclusivity of node label=" - + node.getPartition(), e); - return; + + // Only do non-exclusive allocation when the node-label supports that + try { + if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( + node.getPartition())) { + return; + } + } catch (IOException e) { + LOG.warn("Exception when trying to get exclusivity of node label=" + + node.getPartition(), e); + return; + } + + // Try to use NON_EXCLUSIVE + assignment = root.assignContainers( + clusterResource, + node, + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager.getResourceByLabel( + RMNodeLabelsManager.NO_LABEL, clusterResource)), + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); } - - // Try to use NON_EXCLUSIVE - assignment = root.assignContainers( - clusterResource, - node, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), - SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); - updateSchedulerHealth(lastNodeUpdateTime, node, assignment); + } else { + LOG.info("Skipping scheduling since node " + + node.getNodeID() + + " is reserved by application " + + node.getReservedContainer().getContainerId() + .getApplicationAttemptId()); } - } else { - LOG.info("Skipping scheduling since node " - + node.getNodeID() - + " is reserved by application " - + node.getReservedContainer().getContainerId() - .getApplicationAttemptId()); + } finally { + SchedulerMetrics.getMetrics().updateSchedulingDurationStat(getClock().getTime() - start); } } @@ -1273,11 +1282,13 @@ public void handle(SchedulerEvent event) { { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; RMNode node = nodeUpdatedEvent.getRMNode(); - setLastNodeUpdateTime(Time.now()); + setLastNodeUpdateTime(getClock().getTime()); + long start = getClock().getTime(); nodeUpdate(node); if (!scheduleAsynchronously) { allocateContainersToNode(getNode(node.getNodeID())); } + SchedulerMetrics.getMetrics().updateNodeUpdateDurationStat(getClock().getTime() - start); } break; case APP_ADDED: @@ -1472,7 +1483,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, rmContainer, containerStatus, event, null, true); if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { - schedulerHealth.updatePreemption(Time.now(), container.getNodeId(), + schedulerHealth.updatePreemption(getClock().getTime(), container.getNodeId(), container.getId(), queue.getQueuePath()); schedulerHealth.updateSchedulerPreemptionCounts(1); } else { @@ -1535,8 +1546,8 @@ public void killContainer(RMContainer cont) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( - cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), - RMContainerEventType.KILL); + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override @@ -1772,7 +1783,7 @@ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { } return EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } - + @Override public Resource getMaximumResourceCapability(String queueName) { CSQueue queue = getQueue(queueName); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java old mode 100644 new mode 100755 index 3eefb8f..8f44040 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -19,7 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -69,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -84,7 +92,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -121,7 +128,6 @@ private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -197,7 +203,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -346,7 +351,7 @@ protected synchronized void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -570,13 +575,10 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - + @Override @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; + protected void setClock(Clock clock) { + super.setClock(clock); } public FairSchedulerEventLog getEventLog() { @@ -905,69 +907,73 @@ private synchronized void removeNode(RMNode rmNode) { public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals) { + long start = getClock().getTime(); + try { + // Make sure this application exists + FSAppAttempt application = getSchedulerApp(appAttemptId); + if (application == null) { + LOG.info("Calling allocate on removed " + + "or non existant application " + appAttemptId); + return EMPTY_ALLOCATION; + } - // Make sure this application exists - FSAppAttempt application = getSchedulerApp(appAttemptId); - if (application == null) { - LOG.info("Calling allocate on removed " + - "or non existant application " + appAttemptId); - return EMPTY_ALLOCATION; - } + // Sanity check + SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); - // Sanity check - SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, - clusterResource, minimumAllocation, getMaximumResourceCapability(), - incrAllocation); + // Record container allocation start time + application.recordContainerRequestTime(getClock().getTime()); - // Record container allocation start time - application.recordContainerRequestTime(getClock().getTime()); + // Release containers + releaseContainers(release, application); - // Release containers - releaseContainers(release, application); + synchronized (application) { + if (!ask.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: pre-update" + + " applicationAttemptId=" + appAttemptId + + " application=" + application.getApplicationId()); + } + application.showRequests(); - synchronized (application) { - if (!ask.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: pre-update" + - " applicationAttemptId=" + appAttemptId + - " application=" + application.getApplicationId()); - } - application.showRequests(); + // Update application requests + application.updateResourceRequests(ask); - // Update application requests - application.updateResourceRequests(ask); + application.showRequests(); + } - application.showRequests(); - } + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: post-update" + + " applicationAttemptId=" + appAttemptId + + " #ask=" + ask.size() + + " reservation= " + application.getCurrentReservation()); - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: post-update" + - " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size() + - " reservation= " + application.getCurrentReservation()); + LOG.debug("Preempting " + application.getPreemptionContainers().size() + + " container(s)"); + } - LOG.debug("Preempting " + application.getPreemptionContainers().size() - + " container(s)"); - } + Set preemptionContainerIds = new HashSet(); + for (RMContainer container : application.getPreemptionContainers()) { + preemptionContainerIds.add(container.getContainerId()); + } - Set preemptionContainerIds = new HashSet(); - for (RMContainer container : application.getPreemptionContainers()) { - preemptionContainerIds.add(container.getContainerId()); - } + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); - application.updateBlacklist(blacklistAdditions, blacklistRemovals); - ContainersAndNMTokensAllocation allocation = - application.pullNewlyAllocatedContainersAndNMTokens(); + // Record container allocation time + if (!(allocation.getContainerList().isEmpty())) { + application.recordContainerAllocationTime(getClock().getTime()); + } - // Record container allocation time - if (!(allocation.getContainerList().isEmpty())) { - application.recordContainerAllocationTime(getClock().getTime()); + Resource headroom = application.getHeadroom(); + application.setApplicationHeadroomForMetrics(headroom); + return new Allocation(allocation.getContainerList(), headroom, + preemptionContainerIds, null, null, allocation.getNMTokenList()); } - - Resource headroom = application.getHeadroom(); - application.setApplicationHeadroomForMetrics(headroom); - return new Allocation(allocation.getContainerList(), headroom, - preemptionContainerIds, null, null, allocation.getNMTokenList()); + } finally { + SchedulerMetrics.getMetrics().updateAppAllocateDurationStat(getClock().getTime() - start); } } @@ -1012,6 +1018,7 @@ private synchronized void nodeUpdate(RMNode nm) { long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); + SchedulerMetrics.getMetrics().updateNodeUpdateDurationStat(duration); } void continuousSchedulingAttempt() throws InterruptedException { @@ -1062,6 +1069,7 @@ public int compare(NodeId n1, NodeId n2) { @VisibleForTesting synchronized void attemptScheduling(FSSchedulerNode node) { + long start = getClock().getTime(); if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; @@ -1101,6 +1109,7 @@ synchronized void attemptScheduling(FSSchedulerNode node) { } } updateRootQueueMetrics(); + SchedulerMetrics.getMetrics().updateSchedulingDurationStat(getClock().getTime() - start); } public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { @@ -1391,6 +1400,7 @@ private synchronized void startSchedulerThreads() { @Override public void serviceInit(Configuration conf) throws Exception { initScheduler(conf); + SchedulerMetrics.initMetrics(this); super.serviceInit(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java old mode 100644 new mode 100755 index 6b77ceb..2221932 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -249,6 +250,7 @@ private synchronized void initScheduler(Configuration conf) { @Override public void serviceInit(Configuration conf) throws Exception { initScheduler(conf); + SchedulerMetrics.initMetrics(this); super.serviceInit(conf); } @@ -313,56 +315,61 @@ public synchronized void setRMContext(RMContext rmContext) { public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals) { - FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); - if (application == null) { - LOG.error("Calling allocate on removed " + - "or non existant application " + applicationAttemptId); - return EMPTY_ALLOCATION; - } + long start = getClock().getTime(); + try { + FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); + if (application == null) { + LOG.error("Calling allocate on removed " + + "or non existant application " + applicationAttemptId); + return EMPTY_ALLOCATION; + } - // Sanity check - SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, getMaximumResourceCapability()); + // Sanity check + SchedulerUtils.normalizeRequests(ask, resourceCalculator, + clusterResource, minimumAllocation, getMaximumResourceCapability()); - // Release containers - releaseContainers(release, application); + // Release containers + releaseContainers(release, application); - synchronized (application) { + synchronized (application) { - // make sure we aren't stopping/removing the application - // when the allocate comes in - if (application.isStopped()) { - LOG.info("Calling allocate on a stopped " + - "application " + applicationAttemptId); - return EMPTY_ALLOCATION; - } + // make sure we aren't stopping/removing the application + // when the allocate comes in + if (application.isStopped()) { + LOG.info("Calling allocate on a stopped " + + "application " + applicationAttemptId); + return EMPTY_ALLOCATION; + } - if (!ask.isEmpty()) { - LOG.debug("allocate: pre-update" + - " applicationId=" + applicationAttemptId + - " application=" + application); - application.showRequests(); + if (!ask.isEmpty()) { + LOG.debug("allocate: pre-update" + + " applicationId=" + applicationAttemptId + + " application=" + application); + application.showRequests(); - // Update application requests - application.updateResourceRequests(ask); + // Update application requests + application.updateResourceRequests(ask); - LOG.debug("allocate: post-update" + - " applicationId=" + applicationAttemptId + - " application=" + application); - application.showRequests(); + LOG.debug("allocate: post-update" + + " applicationId=" + applicationAttemptId + + " application=" + application); + application.showRequests(); - LOG.debug("allocate:" + - " applicationId=" + applicationAttemptId + - " #ask=" + ask.size()); - } + LOG.debug("allocate:" + + " applicationId=" + applicationAttemptId + + " #ask=" + ask.size()); + } - application.updateBlacklist(blacklistAdditions, blacklistRemovals); - ContainersAndNMTokensAllocation allocation = - application.pullNewlyAllocatedContainersAndNMTokens(); - Resource headroom = application.getHeadroom(); - application.setApplicationHeadroomForMetrics(headroom); - return new Allocation(allocation.getContainerList(), headroom, null, - null, null, allocation.getNMTokenList()); + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + Resource headroom = application.getHeadroom(); + application.setApplicationHeadroomForMetrics(headroom); + return new Allocation(allocation.getContainerList(), headroom, null, + null, null, allocation.getNMTokenList()); + } + } finally { + SchedulerMetrics.getMetrics().updateAppAllocateDurationStat(getClock().getTime() - start); } } @@ -404,7 +411,7 @@ public synchronized void addApplication(ApplicationId applicationId, if (transferStateFromPreviousAttempt) { schedulerApp.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); + .getCurrentAppAttempt()); } application.setCurrentAppAttempt(schedulerApp); @@ -476,6 +483,7 @@ private synchronized void doneApplicationAttempt( * @param node node on which resources are available to be allocated */ private void assignContainers(FiCaSchedulerNode node) { + long start = getClock().getTime(); LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); @@ -532,6 +540,7 @@ private void assignContainers(FiCaSchedulerNode node) { } updateAppHeadRoom(attempt); } + SchedulerMetrics.getMetrics().updateSchedulingDurationStat(getClock().getTime() - start); } private int getMaxAllocatableContainers(FiCaSchedulerApp application, @@ -713,46 +722,51 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application } private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } + long start = getClock().getTime(); + try { + FiCaSchedulerNode node = getNode(rmNode.getNodeID()); + + List containerInfoList = rmNode.pullContainerUpdates(); + List newlyLaunchedContainers = new ArrayList(); + List completedContainers = new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + } - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } - if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - node.getAvailableResource(),minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getAvailableResource()); + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + node.getAvailableResource(),minimumAllocation)) { + LOG.debug("Node heartbeat " + rmNode.getNodeID() + + " available resource = " + node.getAvailableResource()); - assignContainers(node); + assignContainers(node); - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getAvailableResource()); - } + LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + + node.getAvailableResource()); + } - updateAvailableResourcesMetrics(); + updateAvailableResourcesMetrics(); + } finally { + SchedulerMetrics.getMetrics().updateNodeUpdateDurationStat(getClock().getTime() - start); + } } private void increaseUsedResources(RMContainer rmContainer) { @@ -761,7 +775,7 @@ private void increaseUsedResources(RMContainer rmContainer) { private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, - usedResource)); + usedResource)); } private void updateAvailableResourcesMetrics() { @@ -903,9 +917,9 @@ protected synchronized void completedContainer(RMContainer rmContainer, // Update total usage Resources.subtractFrom(usedResource, container.getResource()); - LOG.info("Application attempt " + application.getApplicationAttemptId() + + LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + - " on node: " + node + + " on node: " + node + " with event: " + event); }