diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 282f4cdb0b..670fda6b98 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4207,6 +4207,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " formats. 'none' will not disable LLAP IO for any formats."), LLAP_OBJECT_CACHE_ENABLED("hive.llap.object.cache.enabled", true, "Cache objects (plans, hashtables, etc) in llap"), + LLAP_IO_DECODING_LOCAL_BUFFER_SIZE("hive.llap.io.decoding.metrics.percentiles.buffer.size", + 64, + "Number of Long measures buffered by LLAP IO decoding metrics, under-the-hood this translates to an array of longs per IO thread."), LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS("hive.llap.io.decoding.metrics.percentiles.intervals", "30", "Comma-delimited set of integers denoting the desired rollover intervals (in seconds)\n" + "for percentile latency metrics on the LLAP daemon IO decoding time.\n" + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 0d9077c368..7d869a7fe5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -130,7 +130,8 @@ private LlapIoImpl(Configuration conf) throws IOException { } } } - this.ioMetrics = LlapDaemonIOMetrics.create(displayName, sessionId, Ints.toArray(intervalList)); + int decodingBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_IO_DECODING_LOCAL_BUFFER_SIZE); + this.ioMetrics = LlapDaemonIOMetrics.create(displayName, sessionId, Ints.toArray(intervalList), decodingBufferSize); LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", displayName, sessionId); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java index 4a0226045f..d91594eceb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; +import com.google.common.base.Preconditions; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -35,6 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.LongConsumer; + /** * */ @@ -51,8 +55,22 @@ final MutableQuantiles[] decodingTimes; @Metric MutableGaugeLong maxDecodingTime; + private final LongBatchingWrapper longBatchingWrapper; + + private final LongConsumer longConsumer = new LongConsumer() { + @Override public void accept(long value) { + rateOfDecoding.add(value); + if (value > maxTime) { + maxTime = value; + maxDecodingTime.set(maxTime); + } + for (MutableQuantiles q : decodingTimes) { + q.add(value); + } + } + }; - private LlapDaemonIOMetrics(String displayName, String sessionId, int[] intervals) { + private LlapDaemonIOMetrics(String displayName, String sessionId, int[] intervals, int bufferSize) { this.name = displayName; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonIORegistry"); @@ -60,6 +78,7 @@ private LlapDaemonIOMetrics(String displayName, String sessionId, int[] interval final int len = intervals == null ? 0 : intervals.length; this.decodingTimes = new MutableQuantiles[len]; + Preconditions.checkArgument(bufferSize >= 0, "Metric Buffer Size can not be negative"); for (int i=0; i maxTime) { - maxTime = latency; - maxDecodingTime.set(maxTime); - } - for (MutableQuantiles q : decodingTimes) { - q.add(latency); - } + longBatchingWrapper.consume(latency); } private void getIoStats(MetricsRecordBuilder rb) { @@ -108,4 +121,53 @@ private void getIoStats(MetricsRecordBuilder rb) { } } + /** + * Simple BP Wrapper + */ + static final class LongBatchingWrapper { + private final int maxSize; + private final ReentrantLock lock; + private final ThreadLocal threadLocal; + private final ThreadLocal threadLocalCount; + private final LongConsumer consumer; + + LongBatchingWrapper(int maxSize, ReentrantLock lock, LongConsumer consumer) { + this.maxSize = maxSize; + this.lock = lock; + this.consumer = consumer; + threadLocal = ThreadLocal.withInitial(() -> new long[maxSize]); + threadLocalCount = ThreadLocal.withInitial(() -> 0); + } + + void consume(long element) { + // case no wrapping + if (maxSize == 0) { + lock.lock(); + try { + consumer.accept(element); + } finally { + lock.unlock(); + } + return; + } + + int count = threadLocalCount.get(); + long[] array = threadLocal.get(); + if (count < maxSize) { + array[count] = element; + threadLocalCount.set(count + 1); + } else { + lock.lock(); + try { + for (int i = 0; i < count; i++) { + consumer.accept(array[i]); + } + consumer.accept(element); + } finally { + lock.unlock(); + } + } + + } + } }