diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5b5b350..1b172d4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -329,6 +329,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_SSL.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); @@ -2746,6 +2747,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + "for a no delay. Currently these are the only two supported values" ), + LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS( + "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300", + "Comma-delimited set of integers denoting the desired rollover intervals (in seconds)\n" + + " for percentile latency metrics. Used by LLAP daemon task scheduler metrics for\n" + + " time taken to kill task (due to pre-emption) and useful time wasted by the task that\n" + + " is about to be preempted." + ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e80fb15..c8718c3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -106,7 +106,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, - waitQueueSchedulerClassName, enablePreemption, classLoader); + waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); addIfService(executorService); @@ -219,7 +219,6 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws .build(); } metrics.incrExecutorTotalRequestsHandled(); - metrics.incrExecutorNumQueuedRequests(); } finally { NDC.pop(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 33b41e8..44c0bc2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -20,7 +20,9 @@ import java.lang.management.MemoryType; import java.net.InetSocketAddress; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -68,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean { @@ -193,7 +196,21 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); String sessionId = MetricsUtils.getUUID(); daemonConf.set("llap.daemon.metrics.sessionid", sessionId); - this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); + String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf, + HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS); + List intervalList = new ArrayList<>(); + if (strIntervals != null) { + for (String strInterval : strIntervals) { + try { + intervalList.add(Integer.valueOf(strInterval)); + } catch (NumberFormatException e) { + LOG.warn("Ignoring task pre-emption metrics interval {} from {} as it is invalid", + strInterval, Arrays.toString(strIntervals)); + } + } + } + this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, + Ints.toArray(intervalList)); this.metrics.setMemoryPerInstance(executorMemoryBytes); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 57dd828..2759f26 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -104,10 +105,11 @@ final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); private final Object lock = new Object(); + private final LlapDaemonExecutorMetrics metrics; public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, - ClassLoader classLoader) { + ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) { super(TaskExecutorService.class.getSimpleName()); LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors @@ -127,6 +129,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, new PreemptionQueueComparator()); this.enablePreemption = enablePreemption; this.numSlotsAvailable = new AtomicInteger(numExecutors); + this.metrics = metrics; // single threaded scheduler for tasks from wait queue to executor threads ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -267,7 +270,11 @@ public void run() { trySchedule(task); // wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. - waitQueue.remove(task); + if (waitQueue.remove(task)) { + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } + } } catch (RejectedExecutionException e) { rejectedException = e; } @@ -361,6 +368,9 @@ public SubmissionState schedule(TaskRunnerCallable task) { if (isDebugEnabled) { LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result); } + if (metrics != null) { + metrics.incrTotalRejectedRequests(); + } return result; } } @@ -392,11 +402,17 @@ public SubmissionState schedule(TaskRunnerCallable task) { LOG.info("{} evicted from wait queue in favor of {} because of lower priority", evictedTask.getRequestId(), task.getRequestId()); } + if (metrics != null) { + metrics.incrTotalEvictedFromWaitQueue(); + } } synchronized (lock) { lock.notify(); } + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } return result; } @@ -411,7 +427,11 @@ public void killFragment(String fragmentId) { LOG.debug("Removing {} from waitQueue", fragmentId); } taskWrapper.setIsInWaitQueue(false); - waitQueue.remove(taskWrapper); + if (waitQueue.remove(taskWrapper)) { + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } + } } if (taskWrapper.isInPreemptionQueue()) { if (isDebugEnabled) { @@ -419,6 +439,9 @@ public void killFragment(String fragmentId) { } taskWrapper.setIsInPreemptableQueue(false); preemptionQueue.remove(taskWrapper); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } taskWrapper.getTaskRunnerCallable().killTask(); } else { @@ -511,11 +534,17 @@ private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab LOG.debug("Removing {} from preemption queue because it's state changed to {}", taskWrapper.getRequestId(), newFinishableState); preemptionQueue.remove(taskWrapper.getTaskRunnerCallable()); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() && !taskWrapper.isInWaitQueue()) { LOG.debug("Adding {} to preemption queue since finishable state changed to {}", taskWrapper.getRequestId(), newFinishableState); preemptionQueue.offer(taskWrapper); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } lock.notify(); } @@ -525,6 +554,9 @@ private void addToPreemptionQueue(TaskWrapper taskWrapper) { synchronized (lock) { preemptionQueue.add(taskWrapper); taskWrapper.setIsInPreemptableQueue(true); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } } @@ -534,6 +566,9 @@ private TaskWrapper removeAndGetFromPreemptionQueue() { taskWrapper = preemptionQueue.remove(); if (taskWrapper != null) { taskWrapper.setIsInPreemptableQueue(false); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } } return taskWrapper; @@ -582,6 +617,9 @@ private void updatePreemptionListAndNotify(EndReason reason) { .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest()) + " request " + state + "! Removed from preemption list."); } + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } numSlotsAvailable.incrementAndGet(); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index a1cfbb8..1a5ce64 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -417,6 +417,7 @@ public void onSuccess(TaskRunner2Result result) { break; case CONTAINER_STOP_REQUESTED: LOG.info("Received container stop request (AM preemption) for {}", requestId); + metrics.incrExecutorTotalKilled(); break; case KILL_REQUESTED: LOG.info("Killed task {}", requestId); @@ -424,8 +425,9 @@ public void onSuccess(TaskRunner2Result result) { killtimerWatch.stop(); long elapsed = killtimerWatch.elapsedMillis(); LOG.info("Time to die for task {}", elapsed); + metrics.addMetricsPreemptionTimeToKill(elapsed); } - metrics.incrPreemptionTimeLost(runtimeWatch.elapsedMillis()); + metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsedMillis()); metrics.incrExecutorTotalKilled(); break; case COMMUNICATION_FAILURE: @@ -448,7 +450,6 @@ public void onSuccess(TaskRunner2Result result) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, true); - metrics.decrExecutorNumQueuedRequests(); } @Override @@ -466,9 +467,6 @@ public void onFailure(Throwable t) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, false); - if (metrics != null) { - metrics.decrExecutorNumQueuedRequests(); - } } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 941d926..cfbb2bc 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -26,7 +26,7 @@ */ public enum LlapDaemonExecutorInfo implements MetricsInfo { ExecutorMetrics("Llap daemon cache related metrics"), - ExecutorThreadCountPerInstance("Total number of executor threads per node"), + ExecutorNumExecutorsPerInstance("Total number of executor threads per node"), ExecutorMemoryPerInstance("Total memory for executors per node in bytes"), ExecutorCacheMemoryPerInstance("Total Cache memory per node in bytes"), ExecutorJvmMaxMemory("Max memory available for JVM in bytes"), @@ -36,11 +36,19 @@ ExecutorThreadUserTime("User time in nanoseconds"), ExecutorTotalRequestsHandled("Total number of requests handled by the container"), ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), + ExecutorNumPreemptableRequests("Number of queued requests that are pre-emptable"), + ExecutorTotalRejectedRequests("Total number of requests rejected as wait queue being full"), ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), - ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), - ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), + ExecutorTotalFailed("Total number of requests handled by the container that failed execution"), + ExecutorTotalKilled("Total number of requests handled by the container that got interrupted"), ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"), - PreemptionTimeLost("Total time lost due to task preemptions"); + ExecutorTotalPreemptionTimeToKill("Total amount of time taken for killing tasks due to pre-emption"), + ExecutorTotalPreemptionTimeLost("Total useful cluster time lost because of pre-emption"), + ExecutorPercentileTimeToKill("Percentile time to kill for pre-empted tasks"), + ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"), + ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"), + ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"), + ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"); private final String desc; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 894880f..14722f5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -19,20 +19,25 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorRpcNumHandlers; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCountPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalAskedToDie; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalExecutionFailure; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalInterrupted; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalEvictedFromWaitQueue; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalFailed; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalKilled; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRejectedRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -54,6 +59,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.source.JvmMetrics; /** @@ -70,13 +76,21 @@ private final ThreadMXBean threadMXBean; private final Map cpuMetricsInfoMap; private final Map userMetricsInfoMap; + private long maxTimeLost = Long.MIN_VALUE; + private long maxTimeToKill = Long.MIN_VALUE; final MutableGaugeLong[] executorThreadCpuTime; final MutableGaugeLong[] executorThreadUserTime; @Metric MutableCounterLong executorTotalRequestHandled; @Metric - MutableCounterLong executorNumQueuedRequests; + MutableGaugeInt executorNumQueuedRequests; + @Metric + MutableGaugeInt executorNumPreemptableRequests; + @Metric + MutableCounterLong totalRejectedRequests; + @Metric + MutableCounterLong totalEvictedFromWaitQueue; @Metric MutableCounterLong executorTotalSuccess; @Metric @@ -84,8 +98,6 @@ @Metric MutableCounterLong executorTotalExecutionFailed; @Metric - MutableCounterLong preemptionTimeLost; - @Metric MutableGaugeLong cacheMemoryPerInstance; @Metric MutableGaugeLong memoryPerInstance; @@ -95,9 +107,21 @@ MutableGaugeInt waitQueueSize; @Metric MutableGaugeInt rpcNumHandlers; + @Metric + MutableCounterLong totalPreemptionTimeToKill; + @Metric + MutableCounterLong totalPreemptionTimeLost; + @Metric + MutableGaugeLong maxPreemptionTimeToKill; + @Metric + MutableGaugeLong maxPreemptionTimeLost; + @Metric + final MutableQuantiles[] percentileTimeToKill; + @Metric + final MutableQuantiles[] percentileTimeLost; private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors) { + int numExecutors, final int[] intervals) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -110,6 +134,21 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.cpuMetricsInfoMap = new ConcurrentHashMap<>(); this.userMetricsInfoMap = new ConcurrentHashMap<>(); + final int len = intervals == null ? 0 : intervals.length; + this.percentileTimeToKill = new MutableQuantiles[len]; + this.percentileTimeLost = new MutableQuantiles[len]; + for (int i=0; i maxTimeLost) { + maxTimeLost = value; + maxPreemptionTimeLost.set(maxTimeLost); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.add(value); + } + } + + public void addMetricsPreemptionTimeToKill(long value) { + totalPreemptionTimeToKill.incr(value); + + if (value > maxTimeToKill) { + maxTimeToKill = value; + maxPreemptionTimeToKill.set(maxTimeToKill); + } + + for (MutableQuantiles q : percentileTimeToKill) { + q.add(value); + } } public void incrExecutorTotalKilled() { @@ -191,17 +260,31 @@ private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); rb.addCounter(ExecutorTotalRequestsHandled, executorTotalRequestHandled.value()) - .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) - .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) - .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value()) - .addCounter(PreemptionTimeLost, preemptionTimeLost.value()) - .addGauge(ExecutorThreadCountPerInstance, numExecutors) + .addCounter(ExecutorTotalFailed, executorTotalExecutionFailed.value()) + .addCounter(ExecutorTotalKilled, executorTotalIKilled.value()) + .addCounter(ExecutorTotalEvictedFromWaitQueue, totalEvictedFromWaitQueue.value()) + .addCounter(ExecutorTotalRejectedRequests, totalRejectedRequests.value()) + .addGauge(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) + .addGauge(ExecutorNumPreemptableRequests, executorNumPreemptableRequests.value()) + .addGauge(ExecutorNumExecutorsPerInstance, numExecutors) .addGauge(ExecutorMemoryPerInstance, memoryPerInstance.value()) .addGauge(ExecutorCacheMemoryPerInstance, cacheMemoryPerInstance.value()) .addGauge(ExecutorJvmMaxMemory, jvmMaxMemory.value()) .addGauge(ExecutorWaitQueueSize, waitQueueSize.value()) - .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value()); + .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value()) + .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value()) + .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value()) + .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value()) + .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value()); + + for (MutableQuantiles q : percentileTimeToKill) { + q.snapshot(rb, true); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.snapshot(rb, true); + } } private void updateThreadMetrics(MetricsRecordBuilder rb) { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index d1edd12..506f611 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -190,7 +190,7 @@ public void testWaitQueuePreemption() throws InterruptedException { public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), null); } private ConcurrentMap completionListeners = new ConcurrentHashMap<>(); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index b57ae1a..506354d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -54,10 +54,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -163,6 +167,9 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); + private final LlapTaskSchedulerMetrics metrics; + private final JvmPauseMonitor pauseMonitor; + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new SystemClock()); } @@ -218,6 +225,21 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); + // Initialize the metrics system + LlapMetricsSystem.initialize("LlapDaemon"); + this.pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); + String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName(); + String sessionId = conf.get("llap.daemon.metrics.sessionid"); + // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId? + this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId); + this.metrics.setNumExecutors(executorsPerInstance); + this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L); + this.metrics.setCpuCoresPerInstance(coresPerExecutor); + this.metrics.setLocalityDelay(localityDelayMs); + // TODO: Set this to proper value after HIVE-12959 + this.metrics.setAliveTimeout(60000); + this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor @@ -262,7 +284,8 @@ public void onFailure(Throwable t) { registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } finally { writeLock.unlock(); @@ -275,14 +298,14 @@ public void onFailure(Throwable t) { @Override public void onCreate(final ServiceInstance serviceInstance) { addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock, - numSchedulableTasksPerNode)); + numSchedulableTasksPerNode, metrics)); LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity()); } @Override public void onUpdate(final ServiceInstance serviceInstance) { instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, - nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); } @@ -291,6 +314,7 @@ public void onRemove(final ServiceInstance serviceInstance) { // FIXME: disabling this for now // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + metrics.setClusterNodeCount(activeInstances.getAll().size()); } } @@ -314,6 +338,15 @@ public void shutdown() { if (registry != null) { registry.stop(); } + + if (pauseMonitor != null) { + pauseMonitor.stop(); + } + + if (metrics != null) { + LlapMetricsSystem.shutdown(); + } + } } finally { writeLock.unlock(); @@ -390,6 +423,7 @@ public void dagComplete() { // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); + metrics.incrCompletedDagCount(); dagStats = new StatsPerDag(); } @@ -480,9 +514,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if ( disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -498,9 +530,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if (disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -535,6 +565,13 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return true; } + private void reinsertNodeInfo(final NodeInfo nodeInfo) { + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } + @Override public Object deallocateContainer(ContainerId containerId) { LOG.debug("Ignoring deallocateContainer for containerId: " + containerId); @@ -647,7 +684,8 @@ private void scanForNodeChanges() { if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { /* that's a good node, not added to the allocations yet */ LOG.info("Found a new node: " + inst + "."); - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } } finally { @@ -658,6 +696,7 @@ private void scanForNodeChanges() { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); instanceToNodeMap.put(inst.getWorkerIdentity(), node); + metrics.setClusterNodeCount(activeInstances.getAll().size()); // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); } @@ -690,6 +729,7 @@ private void disableInstance(ServiceInstance instance, boolean isCommFailure) { nodeInfo.disableNode(isCommFailure); // TODO: handle task to container map events in case of hard failures disabledNodesQueue.add(nodeInfo); + metrics.setDisabledNodeCount(disabledNodesQueue.size()); } } finally { writeLock.unlock(); @@ -706,6 +746,9 @@ private void addPendingTask(TaskInfo taskInfo) { } tasksAtPriority.add(taskInfo); knownTasks.putIfAbsent(taskInfo.task, taskInfo); + if (metrics != null) { + metrics.incrPendingTasksCount(); + } } finally { writeLock.unlock(); } @@ -737,6 +780,9 @@ private void registerRunningTask(TaskInfo taskInfo) { runningTasks.put(priority, tasksAtpriority); } tasksAtpriority.add(taskInfo); + if (metrics != null) { + metrics.decrPendingTasksCount(); + } } finally { writeLock.unlock(); } @@ -970,6 +1016,7 @@ private void registerPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.incrementAndGet(); + metrics.incrPendingPreemptionTasksCount(); MutableInt val = pendingPreemptionsPerHost.get(host); if (val == null) { val = new MutableInt(1); @@ -985,6 +1032,7 @@ private void unregisterPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.decrementAndGet(); + metrics.decrPendingPreemptionTasksCount(); MutableInt val = pendingPreemptionsPerHost.get(host); Preconditions.checkNotNull(val); val.decrement(); @@ -1119,23 +1167,24 @@ public void shutdown() { private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; - + private final LlapTaskSchedulerMetrics metrics; /** * Create a NodeInfo bound to a service instance - * - * @param serviceInstance the associated serviceInstance + * @param serviceInstance the associated serviceInstance * @param blacklistConf blacklist configuration * @param clock clock to use to obtain timing information * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto - * detect based on the serviceInstance, -1 indicates indicates - * unlimited capacity +* detect based on the serviceInstance, -1 indicates indicates + * @param metrics */ - NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) { + NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, + int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) { Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); this.serviceInstance = serviceInstance; this.blacklistConf = blacklistConf; this.clock = clock; + this.metrics = metrics; if (numSchedulableTasksConf == 0) { int pendingQueueuCapacity = 0; @@ -1154,6 +1203,7 @@ public void shutdown() { } else { this.numSchedulableTasks = numSchedulableTasksConf; } + metrics.incrSchedulableTasksCount(numSchedulableTasks); LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); } @@ -1195,17 +1245,25 @@ void disableNode(boolean commFailure) { void registerTaskScheduled() { numScheduledTasks++; + metrics.incrRunningTasksCount(); + metrics.decrSchedulableTasksCount(); } void registerTaskSuccess() { numSuccessfulTasks++; + metrics.incrSuccessfulTasksCount(); numScheduledTasks--; + metrics.decrRunningTasksCount(); + metrics.incrSchedulableTasksCount(); } void registerUnsuccessfulTaskEnd(boolean wasPreempted) { numScheduledTasks--; + metrics.decrRunningTasksCount(); + metrics.incrSchedulableTasksCount(); if (wasPreempted) { numPreemptedTasks++; + metrics.incrPreemptedTasksCount(); } } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java new file mode 100644 index 0000000..8b256fa --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Metrics information for llap task scheduler. + */ +public enum LlapTaskSchedulerInfo implements MetricsInfo { + SchedulerMetrics("Llap task scheduler related metrics"), + SchedulerClusterNodeCount("Number of nodes in the cluster"), + SchedulerExecutorsPerInstance("Total number of executor threads per node"), + SchedulerMemoryPerInstance("Total memory for executors per node in bytes"), + SchedulerCpuCoresPerInstance("Total CPU vCores per node"), + SchedulerDisabledNodeCount("Number of nodes disabled temporarily"), + SchedulerLocalityDelay("Task scheduler locality delay in ms"), + SchedulerAliveTimeout("Timeout after which DAG is killed when no nodes are alive"), + SchedulerPendingTaskCount("Number of pending tasks"), + SchedulerSchedulableTaskCount("Total number of schedulable tasks"), + SchedulerSuccessfulTaskCount("Total number of successful tasks"), + SchedulerRunningTaskCount("Total number of running tasks"), + SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"), + SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), + SchedulerCompletedDagCount("Number of DAGs completed"); + + private final String desc; + + LlapTaskSchedulerInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} \ No newline at end of file diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java new file mode 100644 index 0000000..22dcb11 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerAliveTimeout; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerClusterNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCompletedDagCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCpuCoresPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDisabledNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerExecutorsPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerLocalityDelay; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMemoryPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMetrics; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * Metrics about the llap daemon task scheduler. + */ +@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler") +public class LlapTaskSchedulerMetrics implements MetricsSource { + + private final String name; + private final JvmMetrics jvmMetrics; + private final String sessionId; + private final MetricsRegistry registry; + @Metric + MutableGaugeInt numExecutors; + @Metric + MutableGaugeLong memoryPerInstance; + @Metric + MutableGaugeInt cpuCoresPerInstance; + @Metric + MutableGaugeLong localityDelay; + @Metric + MutableGaugeInt clusterNodeCount; + @Metric + MutableGaugeInt disabledNodeCount; + @Metric + MutableGaugeLong aliveTimeout; + @Metric + MutableCounterInt pendingTasksCount; + @Metric + MutableCounterInt schedulableTasksCount; + @Metric + MutableCounterInt rejectedTasksCount; + @Metric + MutableCounterInt runningTasksCount; + @Metric + MutableCounterInt successfulTasksCount; + @Metric + MutableCounterInt preemptedTasksCount; + @Metric + MutableCounterInt completedDagcount; + @Metric + MutableCounterInt pendingPreemptionTasksCount; + + private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) { + this.name = displayName; + this.jvmMetrics = jm; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapTaskSchedulerMetricsRegistry"); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); + } + + public static LlapTaskSchedulerMetrics create(String displayName, String sessionId) { + MetricsSystem ms = LlapMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); + return ms.register(displayName, "Llap Task Scheduler Metrics", + new LlapTaskSchedulerMetrics(displayName, jm, sessionId)); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics) + .setContext("scheduler") + .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME) + .tag(SessionId, sessionId); + getTaskSchedulerStats(rb); + } + + public void setNumExecutors(int value) { + numExecutors.set(value); + } + + public void setMemoryPerInstance(long value) { + memoryPerInstance.set(value); + } + + public void setCpuCoresPerInstance(int value) { + cpuCoresPerInstance.set(value); + } + + public void setLocalityDelay(long value) { + localityDelay.set(value); + } + + public void setClusterNodeCount(int value) { + clusterNodeCount.set(value); + } + + public void setDisabledNodeCount(int value) { + disabledNodeCount.set(value); + } + + public void setAliveTimeout(long value) { + aliveTimeout.set(value); + } + + public void incrPendingTasksCount() { + pendingTasksCount.incr(); + } + + public void decrPendingTasksCount() { + pendingTasksCount.incr(-1); + } + + public void incrSchedulableTasksCount(int delta) { + schedulableTasksCount.incr(delta); + } + + public void incrSchedulableTasksCount() { + schedulableTasksCount.incr(); + } + + public void decrSchedulableTasksCount() { + schedulableTasksCount.incr(-1); + } + + public void incrSuccessfulTasksCount() { + successfulTasksCount.incr(); + } + + public void incrRunningTasksCount() { + runningTasksCount.incr(); + } + + public void decrRunningTasksCount() { + runningTasksCount.incr(-1); + } + + public void incrPreemptedTasksCount() { + preemptedTasksCount.incr(); + } + + public void incrCompletedDagCount() { + completedDagcount.incr(); + } + + public void incrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(); + } + + public void decrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(-1); + } + + private void getTaskSchedulerStats(MetricsRecordBuilder rb) { + rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value()) + .addGauge(SchedulerExecutorsPerInstance, numExecutors.value()) + .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value()) + .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value()) + .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value()) + .addGauge(SchedulerLocalityDelay, localityDelay.value()) + .addGauge(SchedulerAliveTimeout, aliveTimeout.value()) + .addCounter(SchedulerPendingTaskCount, pendingTasksCount.value()) + .addCounter(SchedulerSchedulableTaskCount, schedulableTasksCount.value()) + .addCounter(SchedulerRunningTaskCount, runningTasksCount.value()) + .addCounter(SchedulerSuccessfulTaskCount, successfulTasksCount.value()) + .addCounter(SchedulerPendingPreemptionTaskCount, pendingPreemptionTasksCount.value()) + .addCounter(SchedulerPreemptedTaskCount, preemptedTasksCount.value()) + .addCounter(SchedulerCompletedDagCount, completedDagcount.value()); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public String getName() { + return name; + } +}