diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7dc9e95..1ef81ad 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -329,6 +329,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_SSL.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); @@ -2726,6 +2727,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + "for a no delay. Currently these are the only two supported values" ), + LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS( + "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300", + "Comma-delimited set of integers denoting the desired rollover intervals (in seconds)\n" + + " for percentile latency metrics. Used by LLAP daemon task scheduler metrics for\n" + + " time taken to kill task (due to pre-emption) and useful time wasted by the task that\n" + + " is about to be preempted." + ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e80fb15..c8718c3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -106,7 +106,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, - waitQueueSchedulerClassName, enablePreemption, classLoader); + waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); addIfService(executorService); @@ -219,7 +219,6 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws .build(); } metrics.incrExecutorTotalRequestsHandled(); - metrics.incrExecutorNumQueuedRequests(); } finally { NDC.pop(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 33b41e8..6701190 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -193,7 +193,22 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); String sessionId = MetricsUtils.getUUID(); daemonConf.set("llap.daemon.metrics.sessionid", sessionId); - this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); + String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf, + HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS); + int[] intervals = null; + if (strIntervals != null) { + intervals = new int[strIntervals.length]; + try { + for (int i = 0; i < strIntervals.length; i++) { + intervals[i] = Integer.valueOf(strIntervals[i]); + } + } catch (NumberFormatException e) { + LOG.warn("Ignoring {} because of invalid values", + HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS); + // ignore + } + } + this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, intervals); this.metrics.setMemoryPerInstance(executorMemoryBytes); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 57dd828..5ad5c8c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -104,10 +105,11 @@ final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); private final Object lock = new Object(); + private LlapDaemonExecutorMetrics metrics; public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, - ClassLoader classLoader) { + ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) { super(TaskExecutorService.class.getSimpleName()); LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors @@ -127,6 +129,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, new PreemptionQueueComparator()); this.enablePreemption = enablePreemption; this.numSlotsAvailable = new AtomicInteger(numExecutors); + this.metrics = metrics; // single threaded scheduler for tasks from wait queue to executor threads ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -392,11 +395,17 @@ public SubmissionState schedule(TaskRunnerCallable task) { LOG.info("{} evicted from wait queue in favor of {} because of lower priority", evictedTask.getRequestId(), task.getRequestId()); } + if (metrics != null) { + metrics.incrTotalEvictedFromWaitQueue(); + } } synchronized (lock) { lock.notify(); } + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } return result; } @@ -560,6 +569,9 @@ public void onSuccess(TaskRunner2Result result) { taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } } @Override @@ -570,6 +582,9 @@ public void onFailure(Throwable t) { taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); updatePreemptionListAndNotify(null); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } } private void updatePreemptionListAndNotify(EndReason reason) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index a1cfbb8..1a5ce64 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -417,6 +417,7 @@ public void onSuccess(TaskRunner2Result result) { break; case CONTAINER_STOP_REQUESTED: LOG.info("Received container stop request (AM preemption) for {}", requestId); + metrics.incrExecutorTotalKilled(); break; case KILL_REQUESTED: LOG.info("Killed task {}", requestId); @@ -424,8 +425,9 @@ public void onSuccess(TaskRunner2Result result) { killtimerWatch.stop(); long elapsed = killtimerWatch.elapsedMillis(); LOG.info("Time to die for task {}", elapsed); + metrics.addMetricsPreemptionTimeToKill(elapsed); } - metrics.incrPreemptionTimeLost(runtimeWatch.elapsedMillis()); + metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsedMillis()); metrics.incrExecutorTotalKilled(); break; case COMMUNICATION_FAILURE: @@ -448,7 +450,6 @@ public void onSuccess(TaskRunner2Result result) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, true); - metrics.decrExecutorNumQueuedRequests(); } @Override @@ -466,9 +467,6 @@ public void onFailure(Throwable t) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, false); - if (metrics != null) { - metrics.decrExecutorNumQueuedRequests(); - } } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 941d926..2f32be3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -26,7 +26,7 @@ */ public enum LlapDaemonExecutorInfo implements MetricsInfo { ExecutorMetrics("Llap daemon cache related metrics"), - ExecutorThreadCountPerInstance("Total number of executor threads per node"), + ExecutorNumExecutorsPerInstance("Total number of executor threads per node"), ExecutorMemoryPerInstance("Total memory for executors per node in bytes"), ExecutorCacheMemoryPerInstance("Total Cache memory per node in bytes"), ExecutorJvmMaxMemory("Max memory available for JVM in bytes"), @@ -37,10 +37,16 @@ ExecutorTotalRequestsHandled("Total number of requests handled by the container"), ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), - ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), - ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), + ExecutorTotalFailed("Total number of requests handled by the container that failed execution"), + ExecutorTotalKilled("Total number of requests handled by the container that got interrupted"), ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"), - PreemptionTimeLost("Total time lost due to task preemptions"); + ExecutorTotalPreemptionTimeToKill("Total amount of time taken for killing tasks due to pre-emption"), + ExecutorTotalPreemptionTimeLost("Total useful cluster time lost because of pre-emption"), + ExecutorPercentileTimeToKill("Percentile time to kill for pre-empted tasks"), + ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"), + ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"), + ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"), + ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"); private final String desc; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 894880f..fc8d6d9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -19,20 +19,23 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorRpcNumHandlers; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCountPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalAskedToDie; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalExecutionFailure; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalInterrupted; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalEvictedFromWaitQueue; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalFailed; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalKilled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -54,6 +57,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.source.JvmMetrics; /** @@ -70,13 +74,17 @@ private final ThreadMXBean threadMXBean; private final Map cpuMetricsInfoMap; private final Map userMetricsInfoMap; + private long maxTimeLost = Long.MIN_VALUE; + private long maxTimeToKill = Long.MIN_VALUE; final MutableGaugeLong[] executorThreadCpuTime; final MutableGaugeLong[] executorThreadUserTime; @Metric MutableCounterLong executorTotalRequestHandled; @Metric - MutableCounterLong executorNumQueuedRequests; + MutableGaugeInt executorNumQueuedRequests; + @Metric + MutableCounterLong totalEvictedFromWaitQueue; @Metric MutableCounterLong executorTotalSuccess; @Metric @@ -84,8 +92,6 @@ @Metric MutableCounterLong executorTotalExecutionFailed; @Metric - MutableCounterLong preemptionTimeLost; - @Metric MutableGaugeLong cacheMemoryPerInstance; @Metric MutableGaugeLong memoryPerInstance; @@ -95,9 +101,21 @@ MutableGaugeInt waitQueueSize; @Metric MutableGaugeInt rpcNumHandlers; + @Metric + MutableCounterLong totalPreemptionTimeToKill; + @Metric + MutableCounterLong totalPreemptionTimeLost; + @Metric + MutableGaugeLong maxPreemptionTimeToKill; + @Metric + MutableGaugeLong maxPreemptionTimeLost; + @Metric + final MutableQuantiles[] percentileTimeToKill; + @Metric + final MutableQuantiles[] percentileTimeLost; private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors) { + int numExecutors, final int[] intervals) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -110,6 +128,21 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.cpuMetricsInfoMap = new ConcurrentHashMap<>(); this.userMetricsInfoMap = new ConcurrentHashMap<>(); + final int len = intervals == null ? 0 : intervals.length; + this.percentileTimeToKill = new MutableQuantiles[len]; + this.percentileTimeLost = new MutableQuantiles[len]; + for (int i=0; i maxTimeLost) { + maxTimeLost = value; + maxPreemptionTimeLost.set(maxTimeLost); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.add(value); + } + } + + public void addMetricsPreemptionTimeToKill(long value) { + totalPreemptionTimeToKill.incr(value); + + if (value > maxTimeToKill) { + maxTimeToKill = value; + maxPreemptionTimeToKill.set(maxTimeToKill); + } + + for (MutableQuantiles q : percentileTimeToKill) { + q.add(value); + } } public void incrExecutorTotalKilled() { @@ -191,17 +246,29 @@ private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); rb.addCounter(ExecutorTotalRequestsHandled, executorTotalRequestHandled.value()) - .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) - .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) - .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value()) - .addCounter(PreemptionTimeLost, preemptionTimeLost.value()) - .addGauge(ExecutorThreadCountPerInstance, numExecutors) + .addCounter(ExecutorTotalFailed, executorTotalExecutionFailed.value()) + .addCounter(ExecutorTotalKilled, executorTotalIKilled.value()) + .addCounter(ExecutorTotalEvictedFromWaitQueue, totalEvictedFromWaitQueue.value()) + .addGauge(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) + .addGauge(ExecutorNumExecutorsPerInstance, numExecutors) .addGauge(ExecutorMemoryPerInstance, memoryPerInstance.value()) .addGauge(ExecutorCacheMemoryPerInstance, cacheMemoryPerInstance.value()) .addGauge(ExecutorJvmMaxMemory, jvmMaxMemory.value()) .addGauge(ExecutorWaitQueueSize, waitQueueSize.value()) - .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value()); + .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value()) + .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value()) + .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value()) + .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value()) + .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value()); + + for (MutableQuantiles q : percentileTimeToKill) { + q.snapshot(rb, true); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.snapshot(rb, true); + } } private void updateThreadMetrics(MetricsRecordBuilder rb) { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index d1edd12..506f611 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -190,7 +190,7 @@ public void testWaitQueuePreemption() throws InterruptedException { public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), null); } private ConcurrentMap completionListeners = new ConcurrentHashMap<>(); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index b57ae1a..9ff9d87 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -54,10 +54,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -163,6 +167,9 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); + private final LlapTaskSchedulerMetrics metrics; + private final JvmPauseMonitor pauseMonitor; + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new SystemClock()); } @@ -218,6 +225,22 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); + // Initialize the metrics system + LlapMetricsSystem.initialize("LlapDaemon"); + this.pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); + String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName(); + String sessionId = conf.get("llap.daemon.metrics.sessionid"); + // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId? + this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId); + this.metrics.setNumExecutors(executorsPerInstance); + this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L); + this.metrics.setCpuCoresPerInstance(coresPerExecutor); + this.metrics.setSchedulableTasksPerInstance(numSchedulableTasksPerNode); + this.metrics.setLocalityDelay(localityDelayMs); + // TODO: Set this to proper value after HIVE-12959 + this.metrics.setAliveTimeout(60000); + this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor @@ -262,7 +285,8 @@ public void onFailure(Throwable t) { registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } finally { writeLock.unlock(); @@ -275,14 +299,14 @@ public void onFailure(Throwable t) { @Override public void onCreate(final ServiceInstance serviceInstance) { addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock, - numSchedulableTasksPerNode)); + numSchedulableTasksPerNode, metrics)); LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity()); } @Override public void onUpdate(final ServiceInstance serviceInstance) { instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, - nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); } @@ -291,6 +315,7 @@ public void onRemove(final ServiceInstance serviceInstance) { // FIXME: disabling this for now // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + metrics.setClusterNodeCount(activeInstances.getAll().size()); } } @@ -314,6 +339,15 @@ public void shutdown() { if (registry != null) { registry.stop(); } + + if (pauseMonitor != null) { + pauseMonitor.stop(); + } + + if (metrics != null) { + LlapMetricsSystem.shutdown(); + } + } } finally { writeLock.unlock(); @@ -390,6 +424,7 @@ public void dagComplete() { // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); + metrics.incrCompletedDagCount(); dagStats = new StatsPerDag(); } @@ -480,9 +515,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if ( disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -498,9 +531,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if (disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -535,6 +566,13 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return true; } + private void reinsertNodeInfo(final NodeInfo nodeInfo) { + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } + @Override public Object deallocateContainer(ContainerId containerId) { LOG.debug("Ignoring deallocateContainer for containerId: " + containerId); @@ -647,7 +685,8 @@ private void scanForNodeChanges() { if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { /* that's a good node, not added to the allocations yet */ LOG.info("Found a new node: " + inst + "."); - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } } finally { @@ -658,6 +697,7 @@ private void scanForNodeChanges() { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); instanceToNodeMap.put(inst.getWorkerIdentity(), node); + metrics.setClusterNodeCount(activeInstances.getAll().size()); // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); } @@ -690,6 +730,7 @@ private void disableInstance(ServiceInstance instance, boolean isCommFailure) { nodeInfo.disableNode(isCommFailure); // TODO: handle task to container map events in case of hard failures disabledNodesQueue.add(nodeInfo); + metrics.setDisabledNodeCount(disabledNodesQueue.size()); } } finally { writeLock.unlock(); @@ -706,11 +747,20 @@ private void addPendingTask(TaskInfo taskInfo) { } tasksAtPriority.add(taskInfo); knownTasks.putIfAbsent(taskInfo.task, taskInfo); + updatePendingTasksMetrics(); } finally { writeLock.unlock(); } } + private void updatePendingTasksMetrics() { + int count = 0; + for(List taskInfos : pendingTasks.values()) { + count += taskInfos.size(); + } + metrics.setPendingTasksCount(count); + } + /* Remove a task from the pending list */ private void removePendingTask(TaskInfo taskInfo) { writeLock.lock(); @@ -721,6 +771,7 @@ private void removePendingTask(TaskInfo taskInfo) { LOG.warn("Could not find task: " + taskInfo.task + " in pending list, at priority: " + priority); } + updatePendingTasksMetrics(); } finally { writeLock.unlock(); } @@ -737,11 +788,20 @@ private void registerRunningTask(TaskInfo taskInfo) { runningTasks.put(priority, tasksAtpriority); } tasksAtpriority.add(taskInfo); + updateRunningTasksMetrics(); } finally { writeLock.unlock(); } } + private void updateRunningTasksMetrics() { + int count = 0; + for(TreeSet taskInfos : runningTasks.values()) { + count += taskInfos.size(); + } + metrics.setRunningTasksCount(count); + } + /* Unregister a task from the known and running structures */ private TaskInfo unregisterTask(Object task) { writeLock.lock(); @@ -762,6 +822,7 @@ private TaskInfo unregisterTask(Object task) { } else { LOG.warn("Could not find TaskInfo for task: {}. Not removing it from the running set", task); } + updateRunningTasksMetrics(); return taskInfo; } finally { writeLock.unlock(); @@ -871,6 +932,7 @@ protected void schedulePendingTasks() { break; } } + updatePendingTasksMetrics(); } finally { writeLock.unlock(); } @@ -950,6 +1012,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten break; } } + updateRunningTasksMetrics(); } finally { writeLock.unlock(); } @@ -970,6 +1033,7 @@ private void registerPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.incrementAndGet(); + metrics.incrPendingPreemptionTasksCount(); MutableInt val = pendingPreemptionsPerHost.get(host); if (val == null) { val = new MutableInt(1); @@ -985,6 +1049,7 @@ private void unregisterPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.decrementAndGet(); + metrics.decrPendingPreemptionTasksCount(); MutableInt val = pendingPreemptionsPerHost.get(host); Preconditions.checkNotNull(val); val.decrement(); @@ -1119,23 +1184,24 @@ public void shutdown() { private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; - + private final LlapTaskSchedulerMetrics metrics; /** * Create a NodeInfo bound to a service instance - * - * @param serviceInstance the associated serviceInstance + * @param serviceInstance the associated serviceInstance * @param blacklistConf blacklist configuration * @param clock clock to use to obtain timing information * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto - * detect based on the serviceInstance, -1 indicates indicates - * unlimited capacity +* detect based on the serviceInstance, -1 indicates indicates + * @param metrics */ - NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) { + NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, + int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) { Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); this.serviceInstance = serviceInstance; this.blacklistConf = blacklistConf; this.clock = clock; + this.metrics = metrics; if (numSchedulableTasksConf == 0) { int pendingQueueuCapacity = 0; @@ -1154,6 +1220,7 @@ public void shutdown() { } else { this.numSchedulableTasks = numSchedulableTasksConf; } + metrics.incrSchedulableTasksCount(numSchedulableTasks); LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); } @@ -1195,17 +1262,22 @@ void disableNode(boolean commFailure) { void registerTaskScheduled() { numScheduledTasks++; + metrics.incrScheduledTasksCount(); } void registerTaskSuccess() { numSuccessfulTasks++; + metrics.incrSuccessfulTasksCount(); numScheduledTasks--; + metrics.decrScheduledTasksCount(); } void registerUnsuccessfulTaskEnd(boolean wasPreempted) { numScheduledTasks--; + metrics.decrScheduledTasksCount(); if (wasPreempted) { numPreemptedTasks++; + metrics.incrPreemptedTasksCount(); } } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java new file mode 100644 index 0000000..5a8ff7a --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Metrics information for llap task scheduler. + */ +public enum LlapTaskSchedulerInfo implements MetricsInfo { + SchedulerMetrics("Llap task scheduler related metrics"), + SchedulerClusterNodeCount("Number of nodes in the cluster"), + SchedulerExecutorsPerInstance("Total number of executor threads per node"), + SchedulerMemoryPerInstance("Total memory for executors per node in bytes"), + SchedulerCpuCoresPerInstance("Total CPU vCores per node"), + SchedulerDisabledNodeCount("Number of nodes disabled temporarily"), + SchedulerSchedulableTasksPerInstance("Total number of schedulable tasks per node"), + SchedulerLocalityDelay("Task scheduler locality delay in ms"), + SchedulerAliveTimeout("Timeout after which DAG is killed when no nodes are alive"), + SchedulerPendingTaskCount("Number of pending tasks"), + SchedulerSchedulableTaskCount("Total number of schedulable tasks"), + SchedulerScheduledTaskCount("Total number of scheduled tasks"), + SchedulerSuccessfulTaskCount("Total number of successful tasks"), + SchedulerRunningTaskCount("Total number of running tasks"), + SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"), + SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), + SchedulerCompletedDagCount("Number of DAGs completed"); + + private final String desc; + + LlapTaskSchedulerInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} \ No newline at end of file diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java new file mode 100644 index 0000000..a98ee51 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerAliveTimeout; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerClusterNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCompletedDagCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCpuCoresPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDisabledNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerExecutorsPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerLocalityDelay; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMemoryPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMetrics; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTasksPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerScheduledTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * Metrics about the llap daemon task scheduler. + */ +@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler") +public class LlapTaskSchedulerMetrics implements MetricsSource { + + private final String name; + private final JvmMetrics jvmMetrics; + private final String sessionId; + private final MetricsRegistry registry; + @Metric + MutableGaugeInt numExecutors; + @Metric + MutableGaugeLong memoryPerInstance; + @Metric + MutableGaugeInt cpuCoresPerInstance; + @Metric + MutableGaugeInt schedulableTasksPerInstance; + @Metric + MutableGaugeLong localityDelay; + @Metric + MutableGaugeInt clusterNodeCount; + @Metric + MutableGaugeInt disabledNodeCount; + @Metric + MutableGaugeLong aliveTimeout; + @Metric + MutableGaugeInt pendingTasksCount; + @Metric + MutableGaugeInt runningTasksCount; + @Metric + MutableCounterInt schedulableTasksCount; + @Metric + MutableCounterInt scheduledTasksCount; + @Metric + MutableCounterInt successfulTasksCount; + @Metric + MutableCounterInt preemptedTasksCount; + @Metric + MutableCounterInt completedDagcount; + @Metric + MutableCounterInt pendingPreemptionTasksCount; + + private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) { + this.name = displayName; + this.jvmMetrics = jm; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapTaskSchedulerMetricsRegistry"); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); + } + + public static LlapTaskSchedulerMetrics create(String displayName, String sessionId) { + MetricsSystem ms = LlapMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); + return ms.register(displayName, "Llap Task Scheduler Metrics", + new LlapTaskSchedulerMetrics(displayName, jm, sessionId)); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics) + .setContext("scheduler") + .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME) + .tag(SessionId, sessionId); + getTaskSchedulerStats(rb); + } + + public void setNumExecutors(int value) { + numExecutors.set(value); + } + + public void setMemoryPerInstance(long value) { + memoryPerInstance.set(value); + } + + public void setCpuCoresPerInstance(int value) { + cpuCoresPerInstance.set(value); + } + + public void setSchedulableTasksPerInstance(int value) { + schedulableTasksPerInstance.set(value); + } + + public void setLocalityDelay(long value) { + localityDelay.set(value); + } + + public void setClusterNodeCount(int value) { + clusterNodeCount.set(value); + } + + public void setDisabledNodeCount(int value) { + disabledNodeCount.set(value); + } + + public void setAliveTimeout(long value) { + aliveTimeout.set(value); + } + + public void setPendingTasksCount(int value) { + pendingTasksCount.set(value); + } + + public void incrScheduledTasksCount() { + scheduledTasksCount.incr(); + } + + public void decrScheduledTasksCount() { + scheduledTasksCount.incr(-1); + } + + public void incrSchedulableTasksCount(int delta) { + schedulableTasksCount.incr(delta); + } + + public void incrSuccessfulTasksCount() { + successfulTasksCount.incr(); + } + + public void setRunningTasksCount(int value) { + runningTasksCount.set(value); + } + + public void incrPreemptedTasksCount() { + preemptedTasksCount.incr(); + } + + public void incrCompletedDagCount() { + completedDagcount.incr(); + } + + public void incrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(); + } + + public void decrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(-1); + } + + private void getTaskSchedulerStats(MetricsRecordBuilder rb) { + rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value()) + .addGauge(SchedulerExecutorsPerInstance, numExecutors.value()) + .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value()) + .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value()) + .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value()) + .addGauge(SchedulerSchedulableTasksPerInstance, schedulableTasksPerInstance.value()) + .addGauge(SchedulerLocalityDelay, localityDelay.value()) + .addGauge(SchedulerAliveTimeout, aliveTimeout.value()) + .addGauge(SchedulerPendingTaskCount, pendingTasksCount.value()) + .addGauge(SchedulerSchedulableTaskCount, schedulableTasksCount.value()) + .addGauge(SchedulerScheduledTaskCount, scheduledTasksCount.value()) + .addGauge(SchedulerSuccessfulTaskCount, successfulTasksCount.value()) + .addGauge(SchedulerRunningTaskCount, runningTasksCount.value()) + .addGauge(SchedulerPendingPreemptionTaskCount, pendingPreemptionTasksCount.value()) + .addGauge(SchedulerPreemptedTaskCount, preemptedTasksCount.value()) + .addGauge(SchedulerCompletedDagCount, completedDagcount.value()); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public String getName() { + return name; + } +}