diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1c0aa2a242..7ea06b5467 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4384,6 +4384,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" + "preempted by finishable tasks inside LLAP scheduler.", "llap.daemon.task.scheduler.enable.preemption"), + LLAP_DAEMON_METRICS_AVERAGE_DATA_SIZE( + "hive.llap.daemon.metrics.average.data.size", 0, + "The number of data points stored for calculating executor metrics averages.\n" + + "0 means that average calculation is turned off"), + LLAP_DAEMON_METRICS_AVERAGE_TIME_WINDOW( + "hive.llap.daemon.metrics.average.time.window", "1m", + new TimeValidator(TimeUnit.NANOSECONDS), + "The length of the window used for calculating executor metrics averages.\n"), LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS( "hive.llap.task.communicator.connection.timeout.ms", "16000ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 14dc6326f2..f5752341bc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -26,6 +26,7 @@ import java.util.Date; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -180,6 +181,17 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); boolean enablePreemption = HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION); + int averageWindowDataSize = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_METRICS_AVERAGE_DATA_SIZE); + long averageWindowTimeSize = HiveConf.getTimeVar( + daemonConf, ConfVars.LLAP_DAEMON_METRICS_AVERAGE_TIME_WINDOW, TimeUnit.NANOSECONDS); + + Preconditions.checkArgument(averageWindowDataSize >= 0, + "hive.llap.daemon.metrics.average.data.size should be greater or equal to 0"); + Preconditions.checkArgument(averageWindowDataSize == 0 || averageWindowTimeSize > 0, + "hive.llap.daemon.metrics.average.time.window should be greater than 0 if " + + "hive.llap.daemon.metrics.average.data.size is set"); + final String logMsg = "Attempting to start LlapDaemon with the following configuration: " + "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" + @@ -202,6 +214,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", shufflePort=" + shufflePort + ", waitQueueSize= " + waitQueueSize + ", enablePreemption= " + enablePreemption + + ", averageWindowDataSize= " + averageWindowDataSize + + ", averageWindowTimeSize= " + averageWindowTimeSize + ", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")"; LOG.info(logMsg); final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date()); @@ -264,7 +278,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } } this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, - Ints.toArray(intervalList)); + Ints.toArray(intervalList), averageWindowDataSize, averageWindowTimeSize); this.metrics.setMemoryPerInstance(executorMemoryPerInstance); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 230cee5941..1c7e0e419c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -30,6 +30,7 @@ ExecutorMaxFreeSlots("Sum of wait queue size and number of executors"), ExecutorNumExecutorsPerInstance("Total number of executor threads per node"), ExecutorNumExecutorsAvailable("Total number of executor threads per node that are free"), + ExecutorNumExecutorsAvailableAverage("Total number of executor threads per node that are free averaged over time"), ExecutorAvailableFreeSlots("Number of free slots available"), ExecutorAvailableFreeSlotsPercent("Percent of free slots available"), ExecutorThreadCPUTime("Cpu time in nanoseconds"), @@ -40,6 +41,7 @@ ExecutorThreadUserTime("User time in nanoseconds"), ExecutorTotalRequestsHandled("Total number of requests handled by the container"), ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), + ExecutorNumQueuedRequestsAverage("Number of requests queued by the container for processing averaged over time"), ExecutorNumPreemptableRequests("Number of queued requests that are pre-emptable"), ExecutorTotalRejectedRequests("Total number of requests rejected as wait queue being full"), ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 5129b93249..55ef0e305f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -27,8 +27,10 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequestsAverage; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; @@ -54,9 +56,11 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import org.apache.hadoop.hive.common.JvmMetrics; import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl; @@ -94,6 +98,9 @@ private long fallOffMaxFailedTimeLostLong = 0L; private long fallOffMaxKilledTimeLostLong = 0L; + private TimedAverageMetrics executorNumQueuedRequestsAverage; + private TimedAverageMetrics numExecutorsAvailableAverage; + private final Map executorNames; final MutableGaugeLong[] executorThreadCpuTime; @@ -155,7 +162,8 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors, final int[] intervals) { + int numExecutors, final int[] intervals, int averageWindowDataSize, + long averageWindowTimeSize) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -195,14 +203,22 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.executorThreadUserTime[i] = registry.newGauge(miu, 0L); this.executorNames.put(ContainerRunnerImpl.THREAD_NAME_FORMAT_PREFIX + i, i); } + if (averageWindowDataSize > 0) { + this.executorNumQueuedRequestsAverage = new TimedAverageMetrics(averageWindowDataSize, + averageWindowTimeSize); + this.numExecutorsAvailableAverage = new TimedAverageMetrics(averageWindowDataSize, + averageWindowTimeSize); + } } public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, - int numExecutors, final int[] intervals) { + int numExecutors, final int[] intervals, int averageWindowDataSize, + long averageWindowTimeSize) { MetricsSystem ms = LlapMetricsSystem.instance(); JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); return ms.register(displayName, "LlapDaemon Executor Metrics", - new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals)); + new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals, + averageWindowDataSize, averageWindowTimeSize)); } @Override @@ -220,6 +236,9 @@ public void incrExecutorTotalRequestsHandled() { public void setExecutorNumQueuedRequests(int value) { executorNumQueuedRequests.set(value); + if (executorNumQueuedRequestsAverage != null) { + executorNumQueuedRequestsAverage.add(value); + } } public void setExecutorNumPreemptableRequests(int value) { @@ -228,6 +247,9 @@ public void setExecutorNumPreemptableRequests(int value) { public void setNumExecutorsAvailable(int value) { numExecutorsAvailable.set(value); + if (numExecutorsAvailableAverage != null) { + numExecutorsAvailableAverage.add(value); + } } public void incrTotalEvictedFromWaitQueue() { @@ -355,6 +377,12 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value()) .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value()) .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value()); + if (numExecutorsAvailableAverage != null) { + rb.addGauge(ExecutorNumExecutorsAvailableAverage, numExecutorsAvailableAverage.value()); + } + if (executorNumQueuedRequestsAverage != null) { + rb.addGauge(ExecutorNumQueuedRequestsAverage, executorNumQueuedRequestsAverage.value()); + } for (MutableQuantiles q : percentileTimeToKill) { q.snapshot(rb, true); @@ -397,4 +425,104 @@ public JvmMetrics getJvmMetrics() { public String getName() { return name; } + + /** + * Generate time aware average for data points. + * For example if we have 3s when the queue size is 1, and 1s when the queue size is 2 then the + * calculated average should be (3*1+1*2)/4 = 1.25. + */ + @VisibleForTesting + static class TimedAverageMetrics { + private final int windowDataSize; + private final long windowTimeSize; + private final Data[] data; + private int nextPos = 0; + + /** + * Creates and initializes the metrics object. + * @param windowDataSize The maximum number of samples stored + * @param windowTimeSize The time window used to generate the average in nanoseconds + */ + TimedAverageMetrics(int windowDataSize, long windowTimeSize) { + this(windowDataSize, windowTimeSize, System.nanoTime() - windowTimeSize - 1); + } + + @VisibleForTesting + TimedAverageMetrics(int windowDataSize, long windowTimeSize, + long defaultTime) { + assert(windowDataSize > 0); + this.windowDataSize = windowDataSize; + this.windowTimeSize = windowTimeSize; + this.data = new Data[windowDataSize]; + Arrays.setAll(data, i -> new Data(defaultTime, 0L)); + } + + /** + * Adds a new sample value to the metrics. + * @param value The new sample value + */ + public synchronized void add(long value) { + add(System.nanoTime(), value); + } + + /** + * Calculates the average for the last windowTimeSize window. + * @return The average + */ + public synchronized long value() { + return value(System.nanoTime()); + } + + @VisibleForTesting + void add(long time, long value) { + data[nextPos].nanoTime = time; + data[nextPos].value = value; + nextPos++; + if (nextPos == windowDataSize) { + nextPos = 0; + } + } + + @VisibleForTesting + long value(long time) { + // We expect that the data time positions are strictly increasing and the time is greater than + // any of the data position time. This is ensured by using System.nanoTime(). + long sum = 0L; + long lastTime = time; + long minTime = lastTime - windowTimeSize; + int pos = nextPos - 1; + do { + // Loop the window + if (pos < 0) { + pos = windowDataSize - 1; + } + // If we are at the end of the window + if (data[pos].nanoTime < minTime) { + sum += (lastTime - minTime) * data[pos].value; + break; + } + sum += (lastTime - data[pos].nanoTime) * data[pos].value; + lastTime = data[pos].nanoTime; + pos--; + } while (pos != nextPos - 1); + // If we exited the loop and we did not have enough data point estimate the data with the last + // known point + if (pos == nextPos - 1 && data[nextPos].nanoTime > minTime) { + sum += (lastTime - minTime) * data[nextPos].value; + } + return Math.round((double)sum / (double)windowTimeSize); + } + } + + /** + * Single sample data. + */ + private static class Data { + private long nanoTime; + private long value; + Data(long nanoTime, long value) { + this.nanoTime = nanoTime; + this.value = value; + } + } } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index ea4f50b4d0..095eae21e7 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -99,7 +99,7 @@ public void setup() throws Exception { this.metrics = LlapDaemonExecutorMetrics .create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors, - Ints.toArray(intervalList)); + Ints.toArray(intervalList), 0, 0L); for (int i = 0; i < numLocalDirs; i++) { File f = new File(testWorkDir, "localDir"); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java new file mode 100644 index 0000000000..4942a32035 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics.TimedAverageMetrics; + +/** + * Test class for LlapDaemonExecutorMetrics. + */ +public class TestLlapDaemonExecutorMetrics { + + /** + * Test generated values for TimedAverageMetrics when the time window is smaller than the amount + * of data we have stored. + */ + @Test + public void testTimedAverageMetricsTimeBound() { + TimedAverageMetrics metrics; + + // Window 1 + metrics = generateTimedAverageMetrics(1, 10, 1, 100, 0, 0, 1); + assertEquals("Window size 1", 100, metrics.value(100)); + + // Window 1 with multiple data + metrics = generateTimedAverageMetrics(1, 10, 50, 100, 0, 0, 1); + assertEquals("Window size 1 with multiple data", 100, metrics.value(100)); + + // Single point in the middle + metrics = generateTimedAverageMetrics(10, 100, 1, 100, 0, 50, 1); + assertEquals("Single point in the middle", 50, metrics.value(100)); + + // Single point at 3/4 + metrics = generateTimedAverageMetrics(10, 100, 1, 100, 0, 75, 1); + assertEquals("Single point at 3/4", 25, metrics.value(100)); + + // Single point at 1/4 + metrics = generateTimedAverageMetrics(10, 100, 1, 100, 0, 25, 1); + assertEquals("Single point at 1/4", 75, metrics.value(100)); + + // Multiple points after 1/4 + metrics = generateTimedAverageMetrics(10, 100, 3, 100, 0, 25, 25); + assertEquals("Multiple points after 1/4", 75, metrics.value(100)); + + // More points with overflow + metrics = generateTimedAverageMetrics(10, 100, 18, 100, 0, 25, 25); + assertEquals("More points with overflow", 100, metrics.value(450)); + + // Very old points + metrics = generateTimedAverageMetrics(10, 100, 20, 100, 0, 25, 25); + assertEquals("Very old points", 100, metrics.value(5000)); + metrics.add(1000, 10); + assertEquals("Very old points but not that old", 10, metrics.value(5000)); + } + + /** + * Test generated values for TimedAverageMetrics when we have less data points than the window. + */ + @Test + public void testTimedAverageMetricsDataBound() { + TimedAverageMetrics metrics; + + // Window 1 + metrics = generateTimedAverageMetrics(1, 100, 1, 100, 0, 50, 1); + assertEquals("Window size 1", 100, metrics.value(100)); + + // Overflow at the bottom + metrics = generateTimedAverageMetrics(3, 100, 4, 50, 10, 50, 10); + assertEquals("Window size 1 with multiple data", 65, metrics.value(100)); + + } + + /** + * Test that TimedAverageMetrics throws an exception if the window size is 0. + */ + @Test(expected = AssertionError.class) + public void testTimedAverageMetricsWindowSizeZero() { + generateTimedAverageMetrics(0, 100, 2, 50, 50, 0, 50); + } + + /** + * Test TimedAverageMetrics with changing data to see that we handle array edge cases correctly. + */ + @Test + public void testTimedAverageMetricsChanging() { + TimedAverageMetrics metrics; + + metrics = generateTimedAverageMetrics(3, 30, 6, 0, 10, 0, 10); + assertEquals("Position 0", 40, metrics.value(60)); + + metrics = generateTimedAverageMetrics(3, 30, 5, 0, 10, 0, 10); + assertEquals("Position windowDataSize - 1", 30, metrics.value(50)); + } + + /** + * Test the real interfaces of the TimedAverageMetrics. + */ + @Test + public void testTimedAverageMetricsReal() { + TimedAverageMetrics metrics = + new TimedAverageMetrics(10, 6 * 1000 * 1000); + + for (int i = 0; i < 50; i++) { + metrics.add(100); + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + assertEquals("Checking the calculated value", 100, metrics.value()); + } + + /** + * Check for the maximum expected amount of data for TimedAverageMetrics. + * 15000 data point / 10 minute window / data every 50 ms. + */ + @Test + public void testTimedAverageMetricsBigData() { + long currentTime = System.nanoTime(); + // Data range in metrics from [0 / 250s] till [14999 / (1000s - 50ms)] + TimedAverageMetrics metrics = generateTimedAverageMetrics(15000, + 10L * 60L * 1000L * 1000L * 1000L, 20000, -5000, 1, currentTime, 50L * 1000L * 1000L); + + // Checking value from [3000/600s] - [14999/1000s] -> 8999.5 + assertEquals("Checking the calculated value", 9000, metrics.value(currentTime + 50L * 1000L * 1000L * 20000L)); + } + + private TimedAverageMetrics generateTimedAverageMetrics(int windowDataSize, long windowTimeSize, int dataNum, + long firstData, long dataDelta, long firstTime, long timeDelta) { + TimedAverageMetrics metrics = + new TimedAverageMetrics(windowDataSize, windowTimeSize, firstTime - windowTimeSize); + + for (int i = 0; i < dataNum; i++) { + metrics.add(firstTime + i * timeDelta, firstData + i * dataDelta); + } + return metrics; + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/metrics/package-info.java llap-server/src/test/org/apache/hadoop/hive/llap/metrics/package-info.java new file mode 100644 index 0000000000..4902515f5b --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/metrics/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test classes for metrics package. + */ + +package org.apache.hadoop.hive.llap.metrics;