diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 50c0e22837..82776abea2 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -366,4 +368,19 @@ private static boolean isSomeHiveDir(String p) { return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX) || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX); } + + + public static ThreadMXBean initThreadMxBean() { + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + if (mxBean != null) { + if (!mxBean.isCurrentThreadCpuTimeSupported()) { + LOG.warn("Thread CPU monitoring is not supported"); + return null; + } else if (!mxBean.isThreadCpuTimeEnabled()) { + LOG.warn("Thread CPU monitoring is not enabled"); + return null; + } + } + return mxBean; + } } diff --git llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java index 059d5b9ae3..d27193f02e 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java @@ -36,7 +36,9 @@ TOTAL_IO_TIME_NS(false), DECODE_TIME_NS(false), HDFS_TIME_NS(false), - CONSUMER_TIME_NS(false); + CONSUMER_TIME_NS(false), + IO_CPU_NS(false), + IO_USER_NS(false); // flag to indicate if these counters are subject to change across different test runs private boolean testSafe; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java index 91df036bfd..c2aca5ad44 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java @@ -21,6 +21,7 @@ void recordCacheHit(long bytesHit); void recordCacheMiss(long bytesMissed); void recordAllocBytes(long bytesWasted, long bytesAllocated); - void recordHdfsTime(long timeUs); + void recordHdfsTime(long timeNs); + void recordThreadTimes(long cpuNs, long userNs); long startTimeCounter(); } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index be4dfad95c..f5f2982a87 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -71,7 +71,7 @@ public final long startTimeCounter() { return (doUseTimeCounters ? System.nanoTime() : 0); } - public void incrTimeCounter(LlapIOCounters counter, long startTime) { + public void incrWallClockCounter(LlapIOCounters counter, long startTime) { if (!doUseTimeCounters) return; long delta = System.nanoTime() - startTime; fixedCounters.addAndGet(counter.ordinal(), delta); @@ -109,7 +109,13 @@ public void recordAllocBytes(long bytesUsed, long bytesAllocated) { @Override public void recordHdfsTime(long startTime) { - incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + } + + @Override + public void recordThreadTimes(long cpuNs, long userNs) { + incrCounter(LlapIOCounters.IO_CPU_NS, cpuNs); + incrCounter(LlapIOCounters.IO_USER_NS, userNs); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 27462e1bcb..16e395098a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Stack; @@ -31,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; -import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; @@ -50,6 +53,7 @@ private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class); // uncaught exception handler that will be set for all threads before execution private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + private final ThreadMXBean mxBean; public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, @@ -66,11 +70,12 @@ public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSiz final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.uncaughtExceptionHandler = handler; + this.mxBean = LlapUtil.initThreadMxBean(); } @Override protected RunnableFuture newTaskFor(final Callable callable) { - return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler)); + return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler, mxBean)); } public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) { @@ -86,11 +91,13 @@ public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) private static class WrappedCallable implements Callable { private Callable actualCallable; private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + private ThreadMXBean mxBean; WrappedCallable(final Callable callable, - final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler, ThreadMXBean mxBean) { this.actualCallable = callable; this.uncaughtExceptionHandler = uncaughtExceptionHandler; + this.mxBean = mxBean; } @Override @@ -104,12 +111,18 @@ public V call() throws Exception { // clone thread local file system statistics List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); - + long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(), + userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime(); setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { - updateFileSystemCounters(statsBefore, actualCallable); + if (mxBean != null) { + cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime; + userTime = mxBean.getCurrentThreadUserTime() - userTime; + } + updateCounters(statsBefore, actualCallable, cpuTime, userTime); + MDC.clear(); } } @@ -148,8 +161,17 @@ private void setupMDCFromNDC(final Callable actualCallable) { } } - private void updateFileSystemCounters(final List statsBefore, - final Callable actualCallable) { + /** + * LLAP IO related counters. + */ + public enum LlapExecutorCounters { + EXECUTOR_CPU_NS, + EXECUTOR_USER_NS; + + } + + private void updateCounters(final List statsBefore, + final Callable actualCallable, long cpuTime, long userTime) { Thread thread = Thread.currentThread(); TezCounters tezCounters = null; // add tez counters for task execution and llap io @@ -163,6 +185,10 @@ private void updateFileSystemCounters(final List statsB } if (tezCounters != null) { + if (cpuTime >= 0 && userTime >= 0) { + tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime); + tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime); + } if (statsBefore != null) { // if there are multiple stats for the same scheme (from different NameNode), this // method will squash them together diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 27a5b0f3e4..9ef7af4eb0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -338,7 +338,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException if (wasFirst) { firstReturnTime = counters.startTimeCounter(); } - counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); + counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); return false; } if (isAcidScan) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index d5c2d48db1..080fedfd15 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hive.llap.io.decode; +import java.lang.management.ThreadMXBean; import java.util.concurrent.Callable; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; @@ -41,11 +44,14 @@ // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema. private static final int CVB_POOL_SIZE = 128; protected final FixedSizedObjectPool cvbPool; + protected final QueryFragmentCounters counters; + private final ThreadMXBean mxBean; public EncodedDataConsumer(Consumer consumer, final int colCount, - LlapDaemonIOMetrics ioMetrics) { + LlapDaemonIOMetrics ioMetrics, QueryFragmentCounters counters) { this.downstreamConsumer = consumer; this.ioMetrics = ioMetrics; + this.mxBean = LlapUtil.initThreadMxBean(); cvbPool = new FixedSizedObjectPool(CVB_POOL_SIZE, new Pool.PoolObjectHelper() { @Override @@ -57,12 +63,26 @@ public void resetBeforeOffer(ColumnVectorBatch t) { // Don't reset anything, we are reusing column vectors. } }); + this.counters = counters; } public void init(ConsumerFeedback upstreamFeedback, Callable readCallable) { this.upstreamFeedback = upstreamFeedback; - this.readCallable = readCallable; + this.readCallable = mxBean == null ? readCallable : new Callable() { + + @Override + public Void call() throws Exception { + long cpuTime = mxBean.getCurrentThreadCpuTime(), + userTime = mxBean.getCurrentThreadUserTime(); + try { + return readCallable.call(); + } finally { + counters.recordThreadTimes(mxBean.getCurrentThreadCpuTime() - cpuTime, + mxBean.getCurrentThreadUserTime() - userTime); + } + } + }; } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 40248a37f8..83931c27b1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -69,7 +69,6 @@ private CompressionCodec codec; private List stripes; private final boolean skipCorrupt; // TODO: get rid of this - private final QueryFragmentCounters counters; private SchemaEvolution evolution; private IoTrace trace; private final Includes includes; @@ -79,11 +78,10 @@ public OrcEncodedDataConsumer( Consumer consumer, Includes includes, boolean skipCorrupt, QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { - super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics); + super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters); this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; - this.counters = counters; } public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) { @@ -209,7 +207,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize); } LlapIoImpl.ORC_LOGGER.debug("Done with decode"); - counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.DECODE_TIME_NS, startTime); counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG); counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES); } catch (IOException e) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 4f5b0a9e65..74cee64172 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -443,7 +443,7 @@ private void ensureDataReader() throws IOException { } private void recordReaderTime(long startTime) { - counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } private void validateFileMetadata() throws IOException { @@ -519,7 +519,7 @@ private void ensureOrcReader() throws IOException { } } orcReader = EncodedOrcFile.createReader(path, opts); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } /** @@ -677,7 +677,7 @@ private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, StripeInfo DiskRangeList footerRange = rawDataReader.readFileData( new DiskRangeList(offset, offset + si.getFooterLength()), 0, false); // LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange)); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( @@ -716,7 +716,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { if (!isRawDataReaderOpen && isOpen) { long startTime = counters.startTimeCounter(); rawDataReader.open(); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } return; } @@ -734,7 +734,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { rawDataReader.open(); isRawDataReaderOpen = true; } - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 658bc7d621..a5671e9682 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -1688,7 +1688,7 @@ private void cleanUpCurrentRead() { } private void recordReaderTime(long startTime) { - counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } private boolean processStop() {