diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 03a80198c3..657b72bd19 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4392,6 +4392,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" + "preempted by finishable tasks inside LLAP scheduler.", "llap.daemon.task.scheduler.enable.preemption"), + LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS( + "hive.llap.daemon.metrics.simple.average.data.points", 0, + "The number of data points stored for calculating executor metrics simple averages.\n" + + "Currently used for AverageQueueTime and AverageResponseTime\n" + + "0 means that average calculation is turned off"), LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS( "hive.llap.task.communicator.connection.timeout.ms", "16000ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index 231c9629bf..b41d3f5d63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -49,6 +49,16 @@ public void changeStateRunning(boolean isGuaranteed) { changeState(State.RUNNING, getRunningCounter(isGuaranteed)); } + public long getQueueTime() { + return fixedCounters.get(LlapWmCounters.GUARANTEED_QUEUED_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_QUEUED_NS.ordinal()); + } + + public long getRunningTime() { + return fixedCounters.get(LlapWmCounters.GUARANTEED_RUNNING_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_RUNNING_NS.ordinal()); + } + private static LlapWmCounters getQueuedCounter(boolean isGuaranteed) { return isGuaranteed ? LlapWmCounters.GUARANTEED_QUEUED_NS : LlapWmCounters.SPECULATIVE_QUEUED_NS; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 14dc6326f2..755dd77465 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -26,6 +26,7 @@ import java.util.Date; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -180,6 +181,12 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); boolean enablePreemption = HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION); + int simpleAverageWindowDataSize = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS); + + Preconditions.checkArgument(simpleAverageWindowDataSize >= 0, + "hive.llap.daemon.metrics.simple.average.data.points should be greater or equal to 0"); + final String logMsg = "Attempting to start LlapDaemon with the following configuration: " + "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" + @@ -202,6 +209,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", shufflePort=" + shufflePort + ", waitQueueSize= " + waitQueueSize + ", enablePreemption= " + enablePreemption + + ", simpleAverageWindowDataSize= " + simpleAverageWindowDataSize + ", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")"; LOG.info(logMsg); final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date()); @@ -209,6 +217,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor System.err.println(currTSISO8601 + " " + logMsg); + long memRequired = executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0); // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver) @@ -264,7 +273,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } } this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, - Ints.toArray(intervalList)); + Ints.toArray(intervalList), simpleAverageWindowDataSize); this.metrics.setMemoryPerInstance(executorMemoryPerInstance); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 82bb06adfd..10d78bd1a3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -929,6 +929,10 @@ public void onSuccess(TaskRunner2Result result) { taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().setWmCountersDone(); + if (metrics != null) { + metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); + metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); + } updatePreemptionListAndNotify(result.getEndReason()); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index e86a96c1e6..5dc62828d3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -642,4 +642,18 @@ public void setWmCountersRunning() { wmCounters.changeStateRunning(isGuaranteed); } } + + public long getQueueTime() { + if (wmCounters != null) { + return wmCounters.getQueueTime(); + } + return 0; + } + + public long getRunningTime() { + if (wmCounters != null) { + return wmCounters.getRunningTime(); + } + return 0; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 230cee5941..0244b1f77c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -60,6 +60,8 @@ ExecutorFallOffKilledTimeLost("Total time lost in an executor completing after informing the AM - killed fragments"), ExecutorFallOffKilledMaxTimeLost("Max value of time lost in an executor completing after informing the AM - killed fragments"), ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"), + AverageQueueTime("Average queue time for tasks"), + AverageResponseTime("Average response time for successful tasks"), ; private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 5129b93249..a24151fc10 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.metrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.AverageQueueTime; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.AverageResponseTime; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; @@ -58,6 +60,8 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Maps; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics; import org.apache.hadoop.hive.common.JvmMetrics; import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl; import org.apache.hadoop.metrics2.MetricsCollector; @@ -96,6 +100,9 @@ private final Map executorNames; + private final DescriptiveStatistics queueTime; + private final DescriptiveStatistics runningTime; + final MutableGaugeLong[] executorThreadCpuTime; final MutableGaugeLong[] executorThreadUserTime; @Metric @@ -155,7 +162,7 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors, final int[] intervals) { + int numExecutors, final int[] intervals, int simpleAverageWindowDataSize) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -195,14 +202,22 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.executorThreadUserTime[i] = registry.newGauge(miu, 0L); this.executorNames.put(ContainerRunnerImpl.THREAD_NAME_FORMAT_PREFIX + i, i); } + if (simpleAverageWindowDataSize > 0) { + this.queueTime = new SynchronizedDescriptiveStatistics(simpleAverageWindowDataSize); + this.runningTime = new SynchronizedDescriptiveStatistics(simpleAverageWindowDataSize); + } else { + this.queueTime = null; + this.runningTime = null; + } } public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, - int numExecutors, final int[] intervals) { + int numExecutors, final int[] intervals, int simpleAverageWindowDataSize) { MetricsSystem ms = LlapMetricsSystem.instance(); JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); return ms.register(displayName, "LlapDaemon Executor Metrics", - new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals)); + new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals, + simpleAverageWindowDataSize)); } @Override @@ -303,6 +318,18 @@ public void incrExecutorTotalKilled() { executorTotalIKilled.incr(); } + public void addMetricsQueueTime(long queueTime) { + if (this.queueTime != null) { + this.queueTime.addValue(queueTime); + } + } + + public void addMetricsRunningTime(long runningTime) { + if (this.runningTime != null) { + this.runningTime.addValue(runningTime); + } + } + public void setCacheMemoryPerInstance(long value) { cacheMemoryPerInstance.set(value); } @@ -356,6 +383,13 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value()) .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value()); + if (queueTime != null) { + rb.addGauge(AverageQueueTime, queueTime.getSum() / queueTime.getN()); + } + if (runningTime != null) { + rb.addGauge(AverageResponseTime, runningTime.getSum() / runningTime.getN()); + } + for (MutableQuantiles q : percentileTimeToKill) { q.snapshot(rb, true); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index ea4f50b4d0..64645afe17 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -99,7 +99,7 @@ public void setup() throws Exception { this.metrics = LlapDaemonExecutorMetrics .create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors, - Ints.toArray(intervalList)); + Ints.toArray(intervalList), 0); for (int i = 0; i < numLocalDirs; i++) { File f = new File(testWorkDir, "localDir");