diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7d6dec9..951a604 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2033,6 +2033,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Chooses whether query fragments will run in container or in llap"), LLAP_OBJECT_CACHE_ENABLED("hive.llap.object.cache.enabled", true, "Cache objects (plans, hashtables, etc) in llap"), + LLAP_QUEUE_METRICS_PERCENTILE_INTERVALS("hive.llap.queue.metrics.percentiles.intervals", "", + "Comma-delimited set of integers denoting the desired rollover intervals (in seconds) for" + + "percentile latency metrics on the LLAP daemon producer-consumer queue. By default," + + "percentile latency metrics are disabled."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index d3f82dc..2bf2ed9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -22,7 +22,6 @@ import javax.management.ObjectName; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; @@ -39,6 +38,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -119,6 +119,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor pauseMonitor.start(); String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); String sessionId = MetricsUtils.getUUID(); + daemonConf.set("llap.daemon.metrics.sessionid", sessionId); this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 6844292..2f15c31 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -44,13 +44,12 @@ import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.util.JvmPauseMonitor; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -62,8 +61,8 @@ private final ColumnVectorProducer cvp; private final ListeningExecutorService executor; private final Configuration conf; - private LlapDaemonCacheMetrics metrics; - private JvmPauseMonitor pauseMonitor; + private LlapDaemonCacheMetrics cacheMetrics; + private LlapDaemonQueueMetrics queueMetrics; private ObjectName buddyAllocatorMXBean; private Allocator allocator; @@ -76,10 +75,15 @@ private LlapIoImpl(Configuration conf) throws IOException { } String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName(); - // TODO: Find a better way to pass in session id - String sessionId = conf.get("llap.daemon.sessionid"); - this.metrics = LlapDaemonCacheMetrics.create(displayName, sessionId); - LOG.info("Started LlapDaemonCacheMetrics with displayName: " + displayName + + String sessionId = conf.get("llap.daemon.metrics.sessionid"); + this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId); + + displayName = "LlapDaemonQueueMetrics-" + MetricsUtils.getHostName(); + int[] intervals = conf.getInts(String.valueOf( + HiveConf.ConfVars.LLAP_QUEUE_METRICS_PERCENTILE_INTERVALS)); + this.queueMetrics = LlapDaemonQueueMetrics.create(displayName, sessionId, intervals); + + LOG.info("Started llap daemon metrics with displayName: " + displayName + " sessionId: " + sessionId); Cache cache = useLowLevelCache ? null : new NoopCache(); @@ -88,12 +92,13 @@ private LlapIoImpl(Configuration conf) throws IOException { useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf); OrcMetadataCache metadataCache = new OrcMetadataCache(); LowLevelCacheImpl orcCache = createLowLevelCache( - conf, cachePolicy, metadataCache, metrics, useLowLevelCache); + conf, cachePolicy, metadataCache, cacheMetrics, useLowLevelCache); // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?) executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); // TODO: this should depends on input format and be in a map, or something. - this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, metrics); + this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics, + queueMetrics); if (LOGL.isInfoEnabled()) { LOG.info("LLAP IO initialized"); } @@ -128,17 +133,17 @@ private LowLevelCacheImpl createLowLevelCache(Configuration conf, return new LlapInputFormat(sourceInputFormat, cvp, executor); } - public LlapDaemonCacheMetrics getMetrics() { - return metrics; + public LlapDaemonCacheMetrics getCacheMetrics() { + return cacheMetrics; + } + + public LlapDaemonQueueMetrics getQueueMetrics() { + return queueMetrics; } @Override public void close() { LOG.info("Closing LlapIoImpl.."); - if (pauseMonitor != null) { - pauseMonitor.stop(); - } - if (buddyAllocatorMXBean != null) { MBeans.unregister(buddyAllocatorMXBean); buddyAllocatorMXBean = null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index 270a139..ef2d943 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; /** * @@ -40,10 +41,13 @@ private final Consumer downstreamConsumer; private Callable readCallable; private final int colCount; + private final LlapDaemonQueueMetrics queueMetrics; - public EncodedDataConsumer(Consumer consumer, int colCount) { + public EncodedDataConsumer(Consumer consumer, int colCount, + LlapDaemonQueueMetrics queueMetrics) { this.downstreamConsumer = consumer; this.colCount = colCount; + this.queueMetrics = queueMetrics; } public void init(ConsumerFeedback upstreamFeedback, @@ -70,6 +74,7 @@ public void consumeData(EncodedColumnBatch data) { pendingData.put(data.batchKey, data); } } + queueMetrics.setQueueSize(pendingData.size()); } if (localIsStopped) { returnProcessed(data.columnData); @@ -98,7 +103,10 @@ public void consumeData(EncodedColumnBatch data) { return; } if (0 == targetBatch.colsRemaining) { + long start = System.currentTimeMillis(); decodeBatch(targetBatch, downstreamConsumer); + long end = System.currentTimeMillis(); + queueMetrics.addProcessingTime(end - start); // Batch has been decoded; unlock the buffers in cache returnProcessed(targetBatch.columnData); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 1f20f0c..7db60e0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.InputSplit; @@ -43,11 +44,12 @@ private final LowLevelCache lowLevelCache; private final Configuration conf; private boolean _skipCorrupt; // TODO: get rid of this - private LlapDaemonCacheMetrics metrics; + private LlapDaemonCacheMetrics cacheMetrics; + private LlapDaemonQueueMetrics queueMetrics; public OrcColumnVectorProducer(OrcMetadataCache metadataCache, LowLevelCacheImpl lowLevelCache, Cache cache, Configuration conf, - LlapDaemonCacheMetrics metrics) { + LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) { if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Initializing ORC column vector producer"); } @@ -57,7 +59,8 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, this.cache = cache; this.conf = conf; this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); - this.metrics = metrics; + this.cacheMetrics = metrics; + this.queueMetrics = queueMetrics; } @Override @@ -65,9 +68,9 @@ public ReadPipeline createReadPipeline( Consumer consumer, InputSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters) { - metrics.incrCacheReadRequests(); + cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), - _skipCorrupt, counters); + _skipCorrupt, counters, queueMetrics); OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters); edc.init(reader, reader); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 913addc..91fbfc7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.TimestampStreamReader; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; @@ -58,8 +59,8 @@ public OrcEncodedDataConsumer( Consumer consumer, int colCount, boolean skipCorrupt, - QueryFragmentCounters counters) { - super(consumer, colCount); + QueryFragmentCounters counters, LlapDaemonQueueMetrics queueMetrics) { + super(consumer, colCount, queueMetrics); this.skipCorrupt = skipCorrupt; this.counters = counters; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index d00cd1f..4d971e7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcProto; -import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java index 6775985..8f0e9d8 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java @@ -25,16 +25,16 @@ * Metrics information for llap cache. */ public enum LlapDaemonCacheInfo implements MetricsInfo { - LLAP_DAEMON_CACHE_METRICS("Llap daemon cache related metrics"), - CACHE_CAPACITY_REMAINING("Amount of memory available in cache in bytes"), - CACHE_CAPACITY_TOTAL("Total amount of memory allocated for cache in bytes"), - CACHE_CAPACITY_USED("Amount of memory used in cache in bytes"), - CACHE_REQUESTED_BYTES("Disk ranges that are requested in bytes"), - CACHE_HIT_BYTES("Disk ranges that are cached in bytes"), - CACHE_HIT_RATIO("Ratio of disk ranges cached vs requested"), - CACHE_READ_REQUESTS("Number of disk range requests to cache"), - CACHE_ALLOCATED_ARENA("Number of arenas allocated"), - CACHE_NUM_LOCKED_BUFFERS("Number of locked buffers in cache"); + CacheMetrics("Llap daemon cache related metrics"), + CacheCapacityRemaining("Amount of memory available in cache in bytes"), + CacheCapacityTotal("Total amount of memory allocated for cache in bytes"), + CacheCapacityUsed("Amount of memory used in cache in bytes"), + CacheRequestedBytes("Disk ranges that are requested in bytes"), + CacheHitBytes("Disk ranges that are cached in bytes"), + CacheHitRatio("Ratio of disk ranges cached vs requested"), + CacheReadRequests("Number of disk range requests to cache"), + CacheAllocatedArena("Number of arenas allocated"), + CacheNumLockedBuffers("Number of locked buffers in cache"); private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java index 1aabcbf..4453e92 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java @@ -17,16 +17,16 @@ */ package org.apache.hadoop.hive.llap.metrics; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_ALLOCATED_ARENA; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_REMAINING; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_TOTAL; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_CAPACITY_USED; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_HIT_BYTES; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_HIT_RATIO; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_NUM_LOCKED_BUFFERS; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_READ_REQUESTS; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CACHE_REQUESTED_BYTES; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.LLAP_DAEMON_CACHE_METRICS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheAllocatedArena; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityRemaining; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityTotal; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityUsed; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitBytes; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitRatio; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheNumLockedBuffers; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheReadRequests; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheRequestedBytes; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMetrics; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -42,10 +42,9 @@ /** * Llap daemon cache metrics source. */ -@Metrics(about = "LlapDaemon Cache Metrics", context = "llap") +@Metrics(about = "LlapDaemon Cache Metrics", context = MetricsUtils.METRICS_CONTEXT) public class LlapDaemonCacheMetrics implements MetricsSource { final String name; - // TODO: SessionId should come from llap daemon. For now using random UUID. private String sessionId; private final MetricsRegistry registry; @@ -68,7 +67,7 @@ private LlapDaemonCacheMetrics(String name, String sessionId) { this.name = name; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonCacheRegistry"); - this.registry.tag(ProcessName, "LlapDaemon").tag(SessionId, sessionId); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); } public static LlapDaemonCacheMetrics create(String displayName, String sessionId) { @@ -118,8 +117,9 @@ public String getName() { @Override public void getMetrics(MetricsCollector collector, boolean b) { - MetricsRecordBuilder rb = collector.addRecord(LLAP_DAEMON_CACHE_METRICS) - .setContext("llap").tag(ProcessName, "LlapDaemon") + MetricsRecordBuilder rb = collector.addRecord(CacheMetrics) + .setContext(MetricsUtils.METRICS_CONTEXT) + .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME) .tag(SessionId, sessionId); getCacheStats(rb); } @@ -128,15 +128,15 @@ private void getCacheStats(MetricsRecordBuilder rb) { float cacheHitRatio = cacheRequestedBytes.value() == 0 ? 0.0f : (float) cacheHitBytes.value() / (float) cacheRequestedBytes.value(); - rb.addCounter(CACHE_CAPACITY_REMAINING, cacheCapacityTotal.value() - cacheCapacityUsed.value()) - .addCounter(CACHE_CAPACITY_TOTAL, cacheCapacityTotal.value()) - .addCounter(CACHE_CAPACITY_USED, cacheCapacityUsed.value()) - .addCounter(CACHE_READ_REQUESTS, cacheReadRequests.value()) - .addCounter(CACHE_REQUESTED_BYTES, cacheRequestedBytes.value()) - .addCounter(CACHE_HIT_BYTES, cacheHitBytes.value()) - .addCounter(CACHE_ALLOCATED_ARENA, cacheAllocatedArena.value()) - .addCounter(CACHE_NUM_LOCKED_BUFFERS, cacheNumLockedBuffers.value()) - .addGauge(CACHE_HIT_RATIO, cacheHitRatio); + rb.addCounter(CacheCapacityRemaining, cacheCapacityTotal.value() - cacheCapacityUsed.value()) + .addCounter(CacheCapacityTotal, cacheCapacityTotal.value()) + .addCounter(CacheCapacityUsed, cacheCapacityUsed.value()) + .addCounter(CacheReadRequests, cacheReadRequests.value()) + .addCounter(CacheRequestedBytes, cacheRequestedBytes.value()) + .addCounter(CacheHitBytes, cacheHitBytes.value()) + .addCounter(CacheAllocatedArena, cacheAllocatedArena.value()) + .addCounter(CacheNumLockedBuffers, cacheNumLockedBuffers.value()) + .addGauge(CacheHitRatio, cacheHitRatio); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java deleted file mode 100644 index 8e11ab1..0000000 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonContainerRunnerInfo.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.metrics; - -/** - * - */ -public class LlapDaemonContainerRunnerInfo { -} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java index 964ef47..35d6064 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java @@ -20,7 +20,7 @@ import org.apache.hadoop.metrics2.MetricsInfo; /** - * + * Custom MetricsInfo to provide custom name for the metrics. */ public class LlapDaemonCustomMetricsInfo implements MetricsInfo { private String name; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 154e723..d7bed53 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -25,15 +25,15 @@ * Metrics information for llap daemon container. */ public enum LlapDaemonExecutorInfo implements MetricsInfo { - LLAP_DAEMON_EXECUTOR_METRICS("Llap daemon cache related metrics"), - EXECUTOR_THREAD_CPU_TIME("Cpu time in nanoseconds"), - EXECUTOR_THREAD_USER_TIME("User time in nanoseconds"), - EXECUTOR_TOTAL_REQUESTS_HANDLED("Total number of requests handled by the container"), - EXECUTOR_NUM_QUEUED_REQUESTS("Number of requests queued by the container for processing"), - EXECUTOR_TOTAL_SUCCESS("Total number of requests handled by the container that succeeded"), - EXECUTOR_TOTAL_EXECUTION_FAILURE("Total number of requests handled by the container that failed execution"), - EXECUTOR_TOTAL_INTERRUPTED("Total number of requests handled by the container that got interrupted"), - EXECUTOR_TOTAL_ASKED_TO_DIE("Total number of requests handled by the container that were asked to die"); + ExecutorMetrics("Llap daemon cache related metrics"), + ExecutorThreadCPUTime("Cpu time in nanoseconds"), + ExecutorThreadUserTime("User time in nanoseconds"), + ExecutorTotalRequestsHandled("Total number of requests handled by the container"), + ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), + ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), + ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), + ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), + ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"); private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 80ee7a1..22c9fe0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -17,15 +17,15 @@ */ package org.apache.hadoop.hive.llap.metrics; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_NUM_QUEUED_REQUESTS; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_THREAD_CPU_TIME; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_THREAD_USER_TIME; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_ASKED_TO_DIE; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_EXECUTION_FAILURE; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_INTERRUPTED; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_REQUESTS_HANDLED; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_SUCCESS; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.LLAP_DAEMON_EXECUTOR_METRICS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalAskedToDie; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalExecutionFailure; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalInterrupted; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -51,7 +51,7 @@ /** * Metrics about the llap daemon executors. */ -@Metrics(about = "LlapDaemon Executor Metrics", context = "llap") +@Metrics(about = "LlapDaemon Executor Metrics", context = MetricsUtils.METRICS_CONTEXT) public class LlapDaemonExecutorMetrics implements MetricsSource { private final String name; @@ -84,7 +84,7 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.jvmMetrics = jm; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonExecutorRegistry"); - this.registry.tag(ProcessName, "LlapDaemon").tag(SessionId, sessionId); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); this.numExecutors = numExecutors; this.threadMXBean = ManagementFactory.getThreadMXBean(); this.executorThreadCpuTime = new MutableGaugeLong[numExecutors]; @@ -93,10 +93,10 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.userMetricsInfoMap = new ConcurrentHashMap<>(); for (int i = 0; i < numExecutors; i++) { - MetricsInfo mic = new LlapDaemonCustomMetricsInfo(EXECUTOR_THREAD_CPU_TIME.name() + "_" + i, - EXECUTOR_THREAD_CPU_TIME.description()); - MetricsInfo miu = new LlapDaemonCustomMetricsInfo(EXECUTOR_THREAD_USER_TIME.name() + "_" + i, - EXECUTOR_THREAD_USER_TIME.description()); + MetricsInfo mic = new LlapDaemonCustomMetricsInfo(ExecutorThreadCPUTime.name() + "_" + i, + ExecutorThreadCPUTime.description()); + MetricsInfo miu = new LlapDaemonCustomMetricsInfo(ExecutorThreadUserTime.name() + "_" + i, + ExecutorThreadUserTime.description()); this.cpuMetricsInfoMap.put(i, mic); this.userMetricsInfoMap.put(i, miu); this.executorThreadCpuTime[i] = registry.newGauge(mic, 0L); @@ -107,15 +107,16 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, int numExecutors) { MetricsSystem ms = LlapMetricsSystem.instance(); - JvmMetrics jm = JvmMetrics.create("LlapDaemon", sessionId, ms); + JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); return ms.register(displayName, "LlapDaemon Executor Metrics", new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors)); } @Override public void getMetrics(MetricsCollector collector, boolean b) { - MetricsRecordBuilder rb = collector.addRecord(LLAP_DAEMON_EXECUTOR_METRICS) - .setContext("llap").tag(ProcessName, "LlapDaemon") + MetricsRecordBuilder rb = collector.addRecord(ExecutorMetrics) + .setContext(MetricsUtils.METRICS_CONTEXT) + .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME) .tag(SessionId, sessionId); getExecutorStats(rb); } @@ -151,12 +152,12 @@ public void incrExecutorTotalAskedToDie() { private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); - rb.addCounter(EXECUTOR_TOTAL_REQUESTS_HANDLED, executorTotalRequestHandled.value()) - .addCounter(EXECUTOR_NUM_QUEUED_REQUESTS, executorNumQueuedRequests.value()) - .addCounter(EXECUTOR_TOTAL_SUCCESS, executorTotalSuccess.value()) - .addCounter(EXECUTOR_TOTAL_EXECUTION_FAILURE, executorTotalExecutionFailed.value()) - .addCounter(EXECUTOR_TOTAL_INTERRUPTED, executorTotalInterrupted.value()) - .addCounter(EXECUTOR_TOTAL_ASKED_TO_DIE, executorTotalAskedToDie.value()); + rb.addCounter(ExecutorTotalRequestsHandled, executorTotalRequestHandled.value()) + .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) + .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) + .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) + .addCounter(ExecutorTotalInterrupted, executorTotalInterrupted.value()) + .addCounter(ExecutorTotalAskedToDie, executorTotalAskedToDie.value()); } private void updateThreadMetrics(MetricsRecordBuilder rb) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueInfo.java new file mode 100644 index 0000000..7df7877 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueInfo.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Llap daemon producer / consumer queue related metrics. + */ +public enum LlapDaemonQueueInfo implements MetricsInfo { + QueueMetrics("Llap daemon producer/consumer queue related metrics"), + QueueSize("Size of the queue used by producer and consumer"), + PercentileProcessingTime("Percentiles processing time for an element from queue"), + MaxProcessingTime("Max processing time for an element from queue so far"); + + private final String desc; + + LlapDaemonQueueInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueMetrics.java new file mode 100644 index 0000000..5905cae --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonQueueMetrics.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueInfo.MaxProcessingTime; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueInfo.QueueMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueInfo.QueueSize; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * + */ +@Metrics(about = "LlapDaemon Queue Metrics", context = MetricsUtils.METRICS_CONTEXT) +public class LlapDaemonQueueMetrics implements MetricsSource { + private final String name; + private final String sessionId; + private final MetricsRegistry registry; + private long maxTime = Long.MIN_VALUE; + + @Metric + MutableGaugeInt queueSize; + @Metric + MutableRate rateOfProcessing; + final MutableQuantiles[] processingTimes; + @Metric + MutableGaugeLong maxProcessingTime; + + private LlapDaemonQueueMetrics(String displayName, String sessionId, int[] intervals) { + this.name = displayName; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapDaemonQueueRegistry"); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); + + final int len = intervals == null ? 0 : intervals.length; + this.processingTimes = new MutableQuantiles[len]; + for (int i=0; i maxTime) { + maxTime = latency; + maxProcessingTime.set(maxTime); + } + for (MutableQuantiles q : processingTimes) { + q.add(latency); + } + } + + private void getQueueStats(MetricsRecordBuilder rb) { + rb.addGauge(QueueSize, queueSize.value()) + .addGauge(MaxProcessingTime, maxProcessingTime.value()) + .addGauge(MaxProcessingTime, maxProcessingTime.value()); + rateOfProcessing.snapshot(rb, true); + + for (MutableQuantiles q : processingTimes) { + q.snapshot(rb, true); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java index 124b8f2..ce1c965 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java @@ -26,6 +26,9 @@ */ public class MetricsUtils { private static final String LOCALHOST = "localhost"; + public static final String METRICS_CONTEXT = "llap"; + public static final String METRICS_PROCESS_NAME = "LlapDaemon"; + public static String getHostName() { try {