diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 61b7f36..dc6191c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.utils.SLSUtils; @@ -175,6 +177,9 @@ private void startRM() throws IOException, ClassNotFoundException { if(Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FairScheduler.class) { + rmConf.set(YarnConfiguration.RM_SCHEDULER, + SLSFairScheduler.class.getName()); } else { rmConf.set(YarnConfiguration.RM_SCHEDULER, ResourceSchedulerWrapper.class.getName()); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java new file mode 100644 index 0000000..53ee45c --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -0,0 +1,830 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Logger; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Private +@Unstable +public class SLSFairScheduler extends FairScheduler implements + SchedulerWrapper,Configurable { + private static final String EOL = System.getProperty("line.separator"); + private static final int SAMPLING_SIZE = 60; + private ScheduledExecutorService pool; + // counters for scheduler allocate/handle operations + private Counter schedulerAllocateCounter; + private Counter schedulerHandleCounter; + private Map schedulerHandleCounterMap; + // Timers for scheduler allocate/handle operations + private Timer schedulerAllocateTimer; + private Timer schedulerHandleTimer; + private Map schedulerHandleTimerMap; + private List schedulerHistogramList; + private Map histogramTimerMap; + private Lock samplerLock; + private Lock queueLock; + + private Configuration conf; + + private Map appQueueMap = + new ConcurrentHashMap<>(); + private BufferedWriter jobRuntimeLogBW; + + // Priority of the ResourceSchedulerWrapper shutdown hook. + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + // web app + private SLSWebApp web; + + private Map preemptionContainerMap = + new ConcurrentHashMap<>(); + + // metrics + private MetricRegistry metrics; + private SchedulerMetrics schedulerMetrics; + private boolean metricsON; + private String metricsOutputDir; + private BufferedWriter metricsLogBW; + private boolean running = false; + private static Map defaultSchedulerMetricsMap = + new HashMap<>(); + static { + defaultSchedulerMetricsMap.put(FairScheduler.class, + FairSchedulerMetrics.class); + defaultSchedulerMetricsMap.put(FifoScheduler.class, + FifoSchedulerMetrics.class); + defaultSchedulerMetricsMap.put(FairScheduler.class, + FairSchedulerMetrics.class); + } + // must set by outside + private Set queueSet; + private Set trackedAppSet; + + public final Logger LOG = Logger.getLogger(SLSFairScheduler.class); + + public SLSFairScheduler() { + samplerLock = new ReentrantLock(); + queueLock = new ReentrantLock(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + super.setConfig(conf); + // start metrics + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + initMetrics(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + try { + if (metricsLogBW != null) { + metricsLogBW.write("]"); + metricsLogBW.close(); + } + if (web != null) { + web.stop(); + } + tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, SHUTDOWN_HOOK_PRIORITY); + } + + @Override + public Allocation allocate(ApplicationAttemptId attemptId, + List resourceRequests, List containerIds, + List strings, List strings2, + List increaseRequests, + List decreaseRequests) { + if (metricsON) { + final Timer.Context context = schedulerAllocateTimer.time(); + Allocation allocation = null; + try { + allocation = super + .allocate(attemptId, resourceRequests, containerIds, strings, + strings2, increaseRequests, decreaseRequests); + return allocation; + } finally { + context.stop(); + schedulerAllocateCounter.inc(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + return super.allocate(attemptId, resourceRequests, containerIds, strings, + strings2, increaseRequests, decreaseRequests); + } + } + + @Override + public void handle(SchedulerEvent schedulerEvent) { + // metrics off + if (! metricsON) { + super.handle(schedulerEvent); + return; + } + if(!running) running = true; + + // metrics on + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); + if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + updateQueueMetrics(queue, + rmc.getContainer().getResource().getMemorySize(), + rmc.getContainer().getResource().getVirtualCores()); + } + } + + handlerTimer = schedulerHandleTimer.time(); + operationTimer = schedulerHandleTimerMap + .get(schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) handlerTimer.stop(); + if (operationTimer != null) operationTimer.stop(); + schedulerHandleCounter.inc(); + schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + SchedulerApplication app = + applications.get(appAddEvent.getApplicationAttemptId() + .getApplicationId()); + appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() + .getQueueName()); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = super.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + String queue = appQueueMap.get(containerId.getApplicationAttemptId()); + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + releasedMemory += rmc.getContainer().getResource().getMemorySize(); + releasedVCores += rmc.getContainer() + .getResource().getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemorySize(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + updateQueueMetrics(queue, releasedMemory, releasedVCores); + } + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List resourceRequests, + List containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + String queueName = appQueueMap.get(attemptId); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set preemptionContainers = new HashSet(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (! preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (! preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + SortedMap counterMap = metrics.getCounters(); + String names[] = new String[]{ + "counter.queue." + queueName + ".pending.memory", + "counter.queue." + queueName + ".pending.cores", + "counter.queue." + queueName + ".allocated.memory", + "counter.queue." + queueName + ".allocated.cores"}; + long values[] = new long[]{pendingResource.getMemorySize(), + pendingResource.getVirtualCores(), + allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; + for (int i = names.length - 1; i >= 0; i --) { + if (! counterMap.containsKey(names[i])) { + metrics.counter(names[i]); + counterMap = metrics.getCounters(); + } + counterMap.get(names[i]).inc(values[i]); + } + + queueLock.lock(); + try { + if (! schedulerMetrics.isTracked(queueName)) { + schedulerMetrics.trackQueue(queueName); + } + } finally { + queueLock.unlock(); + } + } + + private void tearDown() throws IOException { + // close job runtime writer + if (jobRuntimeLogBW != null) { + jobRuntimeLogBW.close(); + } + // shut pool + if (pool != null) pool.shutdown(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void initMetrics() throws Exception { + metrics = new MetricRegistry(); + // configuration + metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); + int metricsWebAddressPort = conf.getInt( + SLSConfiguration.METRICS_WEB_ADDRESS_PORT, + SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); + // create SchedulerMetrics for current scheduler + String schedulerMetricsType = conf.get(FairScheduler.class.getName()); + Class schedulerMetricsClass = schedulerMetricsType == null? + defaultSchedulerMetricsMap.get(FairScheduler.class) : + Class.forName(schedulerMetricsType); + schedulerMetrics = (SchedulerMetrics)ReflectionUtils + .newInstance(schedulerMetricsClass, new Configuration()); + schedulerMetrics.init(this, metrics); + + // register various metrics + registerJvmMetrics(); + registerClusterResourceMetrics(); + registerContainerAppNumMetrics(); + registerSchedulerMetrics(); + + // .csv output + initMetricsCSVOutput(); + + // start web app to provide real-time tracking + web = new SLSWebApp(this, metricsWebAddressPort); + web.start(); + + // a thread to update histogram timer + pool = new ScheduledThreadPoolExecutor(2); + pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // a thread to output metrics for real-tiem tracking + pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // application running information + jobRuntimeLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/jobruntime.csv"), "UTF-8")); + jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + + "simulate_start_time,simulate_end_time" + EOL); + jobRuntimeLogBW.flush(); + } + + private void registerJvmMetrics() { + // add JVM gauges + metrics.register("variable.jvm.free.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().freeMemory(); + } + } + ); + metrics.register("variable.jvm.max.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().maxMemory(); + } + } + ); + metrics.register("variable.jvm.total.memory", + new Gauge() { + @Override + public Long getValue() { + return Runtime.getRuntime().totalMemory(); + } + } + ); + } + + private void registerClusterResourceMetrics() { + metrics.register("variable.cluster.allocated.memory", + new Gauge() { + @Override + public Long getValue() { + if( getRootQueueMetrics() == null) { + return 0L; + } else { + return getRootQueueMetrics().getAllocatedMB(); + } + } + } + ); + metrics.register("variable.cluster.allocated.vcores", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAllocatedVirtualCores(); + } + } + } + ); + metrics.register("variable.cluster.available.memory", + new Gauge() { + @Override + public Long getValue() { + if(getRootQueueMetrics() == null) { + return 0L; + } else { + return getRootQueueMetrics().getAvailableMB(); + } + } + } + ); + metrics.register("variable.cluster.available.vcores", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAvailableVirtualCores(); + } + } + } + ); + } + + private void registerContainerAppNumMetrics() { + metrics.register("variable.running.application", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAppsRunning(); + } + } + } + ); + metrics.register("variable.running.container", + new Gauge() { + @Override + public Integer getValue() { + if(getRootQueueMetrics() == null) { + return 0; + } else { + return getRootQueueMetrics().getAllocatedContainers(); + } + } + } + ); + } + + private void registerSchedulerMetrics() { + samplerLock.lock(); + try { + // counters for scheduler operations + schedulerAllocateCounter = metrics.counter( + "counter.scheduler.operation.allocate"); + schedulerHandleCounter = metrics.counter( + "counter.scheduler.operation.handle"); + schedulerHandleCounterMap = new HashMap(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Counter counter = metrics.counter( + "counter.scheduler.operation.handle." + e); + schedulerHandleCounterMap.put(e, counter); + } + // timers for scheduler operations + int timeWindowSize = conf.getInt( + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); + schedulerAllocateTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap = new HashMap(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap.put(e, timer); + } + // histogram for scheduler operations (Samplers) + schedulerHistogramList = new ArrayList(); + histogramTimerMap = new HashMap(); + Histogram schedulerAllocateHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.allocate.timecost", + schedulerAllocateHistogram); + schedulerHistogramList.add(schedulerAllocateHistogram); + histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); + Histogram schedulerHandleHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.handle.timecost", + schedulerHandleHistogram); + schedulerHistogramList.add(schedulerHandleHistogram); + histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); + for (SchedulerEventType e : SchedulerEventType.values()) { + Histogram histogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register( + "sampler.scheduler.operation.handle." + e + ".timecost", + histogram); + schedulerHistogramList.add(histogram); + histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); + } + } finally { + samplerLock.unlock(); + } + } + + private void initMetricsCSVOutput() { + int timeIntervalMS = conf.getInt( + SLSConfiguration.METRICS_RECORD_INTERVAL_MS, + SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); + File dir = new File(metricsOutputDir + "/metrics"); + if(! dir.exists() + && ! dir.mkdirs()) { + LOG.error("Cannot create directory " + dir.getAbsoluteFile()); + } + final CsvReporter reporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(metricsOutputDir + "/metrics")); + reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); + } + + class HistogramsRunnable implements Runnable { + @Override + public void run() { + samplerLock.lock(); + try { + for (Histogram histogram : schedulerHistogramList) { + Timer timer = histogramTimerMap.get(histogram); + histogram.update((int) timer.getSnapshot().getMean()); + } + } finally { + samplerLock.unlock(); + } + } + } + + class MetricsLogRunnable implements Runnable { + private boolean firstLine = true; + public MetricsLogRunnable() { + try { + metricsLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/realtimetrack.json"), "UTF-8")); + metricsLogBW.write("["); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void run() { + if(running) { + // all WebApp to get real tracking json + String metrics = web.generateRealTimeTrackingMetrics(); + // output + try { + if(firstLine) { + metricsLogBW.write(metrics + EOL); + firstLine = false; + } else { + metricsLogBW.write("," + metrics + EOL); + } + metricsLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + // the following functions are used by AMSimulator + public void addAMRuntime(ApplicationId appId, + long traceStartTimeMS, long traceEndTimeMS, + long simulateStartTimeMS, long simulateEndTimeMS) { + + if (metricsON) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private void updateQueueMetrics(String queue, + long releasedMemory, int releasedVCores) { + // update queue counters + SortedMap counterMap = metrics.getCounters(); + if (releasedMemory != 0) { + String name = "counter.queue." + queue + ".allocated.memory"; + if (! counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releasedMemory); + } + if (releasedVCores != 0) { + String name = "counter.queue." + queue + ".allocated.cores"; + if (! counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releasedVCores); + } + } + + private void initQueueMetrics(FSQueue queue) { + if (queue instanceof FSLeafQueue) { + SortedMap counterMap = metrics.getCounters(); + String queueName = queue.getQueueName(); + String names[] = new String[]{ + "counter.queue." + queueName + ".pending.memory", + "counter.queue." + queueName + ".pending.cores", + "counter.queue." + queueName + ".allocated.memory", + "counter.queue." + queueName + ".allocated.cores" }; + + for (int i = names.length - 1; i >= 0; i --) { + if (! counterMap.containsKey(names[i])) { + metrics.counter(names[i]); + counterMap = metrics.getCounters(); + } + } + + queueLock.lock(); + try { + if (! schedulerMetrics.isTracked(queueName)) { + schedulerMetrics.trackQueue(queueName); + } + } finally { + queueLock.unlock(); + } + + return; + } + + for (FSQueue child : queue.getChildQueues()) { + initQueueMetrics(child); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + + initQueueMetrics(getQueueManager().getRootQueue()); + } + + public void setQueueSet(Set queues) { + this.queueSet = queues; + } + + public Set getQueueSet() { + return this.queueSet; + } + + public void setTrackedAppSet(Set apps) { + this.trackedAppSet = apps; + } + + public Set getTrackedAppSet() { + return this.trackedAppSet; + } + + public MetricRegistry getMetrics() { + return metrics; + } + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + // API open to out classes + public void addTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId) { + if (metricsON) { + schedulerMetrics.trackApp(appAttemptId, oldAppId); + } + } + + public void removeTrackedApp(ApplicationAttemptId appAttemptId, + String oldAppId) { + if (metricsON) { + schedulerMetrics.untrackApp(appAttemptId, oldAppId); + } + } +} +