diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2c292392de..43905fd42f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4402,6 +4402,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.NANOSECONDS), "The length of the time window used for calculating executor metrics timed averages.\n" + "Currently used for ExecutorNumExecutorsAvailableAverage and ExecutorNumQueuedRequestsAverage\n"), + LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS( + "hive.llap.daemon.metrics.simple.average.data.points", 0, + "The number of data points stored for calculating executor metrics simple averages.\n" + + "Currently used for AverageQueueTime and AverageResponseTime\n" + + "0 means that average calculation is turned off"), LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS( "hive.llap.task.communicator.connection.timeout.ms", "16000ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index 231c9629bf..b41d3f5d63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -49,6 +49,16 @@ public void changeStateRunning(boolean isGuaranteed) { changeState(State.RUNNING, getRunningCounter(isGuaranteed)); } + public long getQueueTime() { + return fixedCounters.get(LlapWmCounters.GUARANTEED_QUEUED_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_QUEUED_NS.ordinal()); + } + + public long getRunningTime() { + return fixedCounters.get(LlapWmCounters.GUARANTEED_RUNNING_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_RUNNING_NS.ordinal()); + } + private static LlapWmCounters getQueuedCounter(boolean isGuaranteed) { return isGuaranteed ? LlapWmCounters.GUARANTEED_QUEUED_NS : LlapWmCounters.SPECULATIVE_QUEUED_NS; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 3f3b15084c..bb6d1caeea 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -185,12 +185,16 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_DATA_POINTS); long timedWindowAverageWindowLength = HiveConf.getTimeVar( daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_WINDOW_LENGTH, TimeUnit.NANOSECONDS); + int simpleAverageWindowDataSize = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_METRICS_SIMPLE_AVERAGE_DATA_POINTS); Preconditions.checkArgument(timedWindowAverageDataPoints >= 0, "hive.llap.daemon.metrics.timed.window.average.data.points should be greater or equal to 0"); Preconditions.checkArgument(timedWindowAverageDataPoints == 0 || timedWindowAverageWindowLength > 0, "hive.llap.daemon.metrics.timed.window.average.window.length should be greater than 0 if " + "hive.llap.daemon.metrics.average.timed.window.data.points is set fo greater than 0"); + Preconditions.checkArgument(simpleAverageWindowDataSize >= 0, + "hive.llap.daemon.metrics.simple.average.data.points should be greater or equal to 0"); final String logMsg = "Attempting to start LlapDaemon with the following configuration: " + "maxJvmMemory=" + maxJvmMemory + " (" @@ -216,6 +220,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", enablePreemption= " + enablePreemption + ", timedWindowAverageDataPoints= " + timedWindowAverageDataPoints + ", timedWindowAverageWindowLength= " + timedWindowAverageWindowLength + + ", simpleAverageWindowDataSize= " + simpleAverageWindowDataSize + ", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")"; LOG.info(logMsg); final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date()); @@ -223,6 +228,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor System.err.println(currTSISO8601 + " " + logMsg); + long memRequired = executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0); // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver) @@ -278,7 +284,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } } this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, - Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength); + Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength, + simpleAverageWindowDataSize); this.metrics.setMemoryPerInstance(executorMemoryPerInstance); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 82bb06adfd..10d78bd1a3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -929,6 +929,10 @@ public void onSuccess(TaskRunner2Result result) { taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().setWmCountersDone(); + if (metrics != null) { + metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); + metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); + } updatePreemptionListAndNotify(result.getEndReason()); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index e86a96c1e6..5dc62828d3 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -642,4 +642,18 @@ public void setWmCountersRunning() { wmCounters.changeStateRunning(isGuaranteed); } } + + public long getQueueTime() { + if (wmCounters != null) { + return wmCounters.getQueueTime(); + } + return 0; + } + + public long getRunningTime() { + if (wmCounters != null) { + return wmCounters.getRunningTime(); + } + return 0; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 1c7e0e419c..f06390cc4f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -62,6 +62,8 @@ ExecutorFallOffKilledTimeLost("Total time lost in an executor completing after informing the AM - killed fragments"), ExecutorFallOffKilledMaxTimeLost("Max value of time lost in an executor completing after informing the AM - killed fragments"), ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"), + AverageQueueTime("Average queue time for tasks"), + AverageResponseTime("Average response time for successful tasks"), ; private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 6e5f93882d..6fee9f06a7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.metrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.AverageQueueTime; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.AverageResponseTime; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; @@ -62,6 +64,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics; import org.apache.hadoop.hive.common.JvmMetrics; import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl; import org.apache.hadoop.metrics2.MetricsCollector; @@ -103,6 +107,9 @@ private final Map executorNames; + private final DescriptiveStatistics queueTime; + private final DescriptiveStatistics runningTime; + final MutableGaugeLong[] executorThreadCpuTime; final MutableGaugeLong[] executorThreadUserTime; @Metric @@ -163,7 +170,7 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, int numExecutors, final int[] intervals, int timedWindowAverageDataPoints, - long timedWindowAverageWindowLength) { + long timedWindowAverageWindowLength, int simpleAverageWindowDataSize) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -203,22 +210,30 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.executorThreadUserTime[i] = registry.newGauge(miu, 0L); this.executorNames.put(ContainerRunnerImpl.THREAD_NAME_FORMAT_PREFIX + i, i); } + if (timedWindowAverageDataPoints > 0) { this.executorNumQueuedRequestsAverage = new TimedAverageMetrics(timedWindowAverageDataPoints, timedWindowAverageWindowLength); this.numExecutorsAvailableAverage = new TimedAverageMetrics(timedWindowAverageDataPoints, timedWindowAverageWindowLength); } + if (simpleAverageWindowDataSize > 0) { + this.queueTime = new SynchronizedDescriptiveStatistics(simpleAverageWindowDataSize); + this.runningTime = new SynchronizedDescriptiveStatistics(simpleAverageWindowDataSize); + } else { + this.queueTime = null; + this.runningTime = null; + } } public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, int numExecutors, final int[] intervals, int timedWindowAverageDataPoints, - long timedWindowAverageWindowLength) { + long timedWindowAverageWindowLength, int simpleAverageWindowDataSize) { MetricsSystem ms = LlapMetricsSystem.instance(); JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); return ms.register(displayName, "LlapDaemon Executor Metrics", new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals, - timedWindowAverageDataPoints, timedWindowAverageWindowLength)); + timedWindowAverageDataPoints, timedWindowAverageWindowLength, simpleAverageWindowDataSize)); } @Override @@ -325,6 +340,18 @@ public void incrExecutorTotalKilled() { executorTotalIKilled.incr(); } + public void addMetricsQueueTime(long queueTime) { + if (this.queueTime != null) { + this.queueTime.addValue(queueTime); + } + } + + public void addMetricsRunningTime(long runningTime) { + if (this.runningTime != null) { + this.runningTime.addValue(runningTime); + } + } + public void setCacheMemoryPerInstance(long value) { cacheMemoryPerInstance.set(value); } @@ -384,6 +411,13 @@ private void getExecutorStats(MetricsRecordBuilder rb) { rb.addGauge(ExecutorNumQueuedRequestsAverage, executorNumQueuedRequestsAverage.value()); } + if (queueTime != null) { + rb.addGauge(AverageQueueTime, queueTime.getSum() / queueTime.getN()); + } + if (runningTime != null) { + rb.addGauge(AverageResponseTime, runningTime.getSum() / runningTime.getN()); + } + for (MutableQuantiles q : percentileTimeToKill) { q.snapshot(rb, true); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index 095eae21e7..b3f4545b53 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -99,7 +99,7 @@ public void setup() throws Exception { this.metrics = LlapDaemonExecutorMetrics .create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors, - Ints.toArray(intervalList), 0, 0L); + Ints.toArray(intervalList), 0, 0L, 0); for (int i = 0; i < numLocalDirs; i++) { File f = new File(testWorkDir, "localDir"); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index afb13bf00f..91f8cd32bb 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -84,7 +84,7 @@ public void testGetDaemonMetrics() throws ServiceException, IOException { LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); LlapDaemonExecutorMetrics executorMetrics = - LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, new int[] {30, 60, 300}, 0, 0L); + LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, new int[] {30, 60, 300}, 0, 0L, 0); LlapProtocolServerImpl server = new LlapProtocolServerImpl(null, numHandlers, null, new AtomicReference(), new AtomicReference(),