diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index ce03de0..1a1d6e4 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -13,14 +13,36 @@ */ package org.apache.hadoop.hive.llap; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import com.google.common.base.Preconditions; + public class LlapUtil { public static String getDaemonLocalDirList(Configuration conf) { String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS); if (localDirList != null && !localDirList.isEmpty()) return localDirList; return conf.get("yarn.nodemanager.local-dirs"); } + + public static FileSystem.Statistics cloneFileSystemStatistics(final String scheme, + final Class fsKlass) { + Preconditions.checkNotNull(scheme); + Preconditions.checkNotNull(fsKlass); + FileSystem.Statistics fss = FileSystem.getStatistics(scheme, fsKlass); + return new FileSystem.Statistics(fss); + } + + public static List cloneFileSystemStatistics() { + List cloned = new ArrayList<>(); + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + cloned.add(new FileSystem.Statistics(statistics)); + } + return cloned; + } } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java index 365ddab..dc72a74 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java @@ -15,23 +15,52 @@ */ package org.apache.hadoop.hive.llap.counters; +import java.util.ArrayList; +import java.util.List; + /** * LLAP IO related counters. */ public enum LlapIOCounters { - NUM_VECTOR_BATCHES, - NUM_DECODED_BATCHES, - SELECTED_ROWGROUPS, - NUM_ERRORS, - ROWS_EMITTED, - METADATA_CACHE_HIT, - METADATA_CACHE_MISS, - CACHE_HIT_BYTES, - CACHE_MISS_BYTES, - ALLOCATED_BYTES, - ALLOCATED_USED_BYTES, - TOTAL_IO_TIME_NS, - DECODE_TIME_NS, - HDFS_TIME_NS, - CONSUMER_TIME_NS + NUM_VECTOR_BATCHES(true), + NUM_DECODED_BATCHES(true), + SELECTED_ROWGROUPS(true), + NUM_ERRORS(true), + ROWS_EMITTED(true), + METADATA_CACHE_HIT(true), + METADATA_CACHE_MISS(true), + CACHE_HIT_BYTES(true), + CACHE_MISS_BYTES(true), + ALLOCATED_BYTES(true), + ALLOCATED_USED_BYTES(true), + TOTAL_IO_TIME_NS(false), + DECODE_TIME_NS(false), + HDFS_TIME_NS(false), + CONSUMER_TIME_NS(false), + // FileSystem counters. These counters are typically prefixed with scheme + BYTES_READ(false), + BYTES_WRITTEN(false), + READ_OPS(false), + LARGE_READ_OPS(false), + WRITE_OPS(false); + + // prefix used for llap async i/o elevator + public static final String LLAP_IO_PREFIX = "LLAP_IO_"; + + // flag to indicate if these counters are subject to change across different test runs + private boolean testSafe; + + LlapIOCounters(final boolean testSafe) { + this.testSafe = testSafe; + } + + public static List testSafeCounterNames() { + List testSafeCounters = new ArrayList<>(); + for (LlapIOCounters counter : values()) { + if (counter.testSafe) { + testSafeCounters.add(counter.name()); + } + } + return testSafeCounters; + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java index 383b65f..32ec830 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java @@ -15,9 +15,13 @@ */ package org.apache.hadoop.hive.llap.counters; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,12 +32,20 @@ public class FragmentCountersMap { private static final Logger LOG = LoggerFactory.getLogger(FragmentCountersMap.class); private static final ConcurrentMap perFragmentCounters = new ConcurrentHashMap<>(); + private static final ConcurrentMap> perFragmentFSStats = new ConcurrentHashMap<>(); public static void registerCountersForFragment(String identifier, TezCounters tezCounters) { if (perFragmentCounters.putIfAbsent(identifier, tezCounters) != null) { LOG.warn("Not registering duplicate counters for fragment with tez identifier string=" + identifier); } + + List clonedFSStats = LlapUtil.cloneFileSystemStatistics(); + if (perFragmentFSStats.putIfAbsent(identifier, clonedFSStats) != null) { + LOG.warn("Not registering duplicate filesystem statistics for fragment with tez identifier string=" + + identifier); + } + } public static TezCounters getCountersForFragment(String identifier) { @@ -42,5 +54,67 @@ public static TezCounters getCountersForFragment(String identifier) { public static void unregisterCountersForFragment(String identifier) { perFragmentCounters.remove(identifier); + perFragmentFSStats.remove(identifier); + } + + public static void updateFileSystemCounters(final String identifier) { + List allFsStatsBefore = perFragmentFSStats.get(identifier); + TezCounters tezCounters = perFragmentCounters.get(identifier); + if (tezCounters != null && allFsStatsBefore != null) { + String counterGroup = LlapIOCounters.class.getName(); + List allFsStatsAfter = FileSystem.getAllStatistics(); + // FS stats during task unregister can have more entries than when it started. + // Example: Stats during unregister may have "hdfs" and "file" as scheme where as during + // register it may only have "hdfs" (no local file system accessed during start). + for (FileSystem.Statistics fsStatsAfter : allFsStatsAfter) { + String scheme = fsStatsAfter.getScheme().toUpperCase(); + FileSystem.Statistics fsStatsBefore = getFSStatsForScheme(allFsStatsBefore, scheme); + if (fsStatsBefore != null) { + long readBytesDelta = fsStatsAfter.getBytesRead() - fsStatsBefore.getBytesRead(); + long writeBytesDelta = fsStatsAfter.getBytesWritten() - fsStatsBefore.getBytesWritten(); + long readOpsDelta = fsStatsAfter.getReadOps() - fsStatsBefore.getReadOps(); + long largeReadOpsDelta = + fsStatsAfter.getLargeReadOps() - fsStatsBefore.getLargeReadOps(); + long writeOpsDelta = fsStatsAfter.getWriteOps() - fsStatsBefore.getWriteOps(); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.BYTES_READ.name()).increment(readBytesDelta); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.BYTES_WRITTEN.name()).increment(writeBytesDelta); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.READ_OPS.name()).increment(readOpsDelta); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.LARGE_READ_OPS.name()).increment(largeReadOpsDelta); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.WRITE_OPS.name()).increment(writeOpsDelta); + } else { + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.BYTES_READ.name()).increment(fsStatsAfter.getBytesRead()); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.BYTES_WRITTEN.name()) + .increment(fsStatsAfter.getBytesWritten()); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.READ_OPS.name()).increment(fsStatsAfter.getReadOps()); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.LARGE_READ_OPS.name()) + .increment(fsStatsAfter.getLargeReadOps()); + tezCounters.findCounter(counterGroup, + scheme + "_" + LlapIOCounters.WRITE_OPS.name()).increment(fsStatsAfter.getWriteOps()); + } + } + if (LOG.isInfoEnabled()) { + LOG.info("FileSystem stats updated for fragId: {} is: {}", identifier, + tezCounters.toShortString()); + } + } + } + + private static FileSystem.Statistics getFSStatsForScheme(final List stats, + final String scheme) { + for (FileSystem.Statistics stat : stats) { + if (stat.getScheme().equalsIgnoreCase(scheme)) { + return stat; + } + } + return null; } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index a53ac61..3158a63 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -17,18 +17,24 @@ */ package org.apache.hadoop.hive.llap.counters; +import java.util.List; import java.util.concurrent.atomic.AtomicLongArray; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters; +import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Per query counters. */ public class QueryFragmentCounters implements LowLevelCacheCounters { + private static final Logger LOG = LoggerFactory.getLogger(QueryFragmentCounters.class); private final boolean doUseTimeCounters; public static enum Desc { @@ -112,6 +118,29 @@ public void recordHdfsTime(long startTime) { incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } + public void updateLlapIOFileSystemCounters(final FileSystem.Statistics fsStatsBefore, + final FileSystem.Statistics fsStatsAfter) { + if (tezCounters != null && fsStatsBefore != null && fsStatsAfter != null) { + final String counterGroup = LlapIOCounters.class.getName(); + final String scheme = fsStatsBefore.getScheme().toUpperCase(); + final String prefix = LlapIOCounters.LLAP_IO_PREFIX + scheme + "_"; + long readBytesDelta = fsStatsAfter.getBytesRead() - fsStatsBefore.getBytesRead(); + long writeBytesDelta = fsStatsAfter.getBytesWritten() - fsStatsBefore.getBytesWritten(); + long readOpsDelta = fsStatsAfter.getReadOps() - fsStatsBefore.getReadOps(); + long largeReadOpsDelta = fsStatsAfter.getLargeReadOps() - fsStatsBefore.getLargeReadOps(); + long writeOpsDelta = fsStatsAfter.getWriteOps() - fsStatsBefore.getWriteOps(); + + tezCounters.findCounter(counterGroup, prefix + LlapIOCounters.BYTES_READ).increment(readBytesDelta); + tezCounters.findCounter(counterGroup, prefix + LlapIOCounters.BYTES_WRITTEN).increment(writeBytesDelta); + tezCounters.findCounter(counterGroup, prefix + LlapIOCounters.READ_OPS).increment(readOpsDelta); + tezCounters.findCounter(counterGroup, prefix + LlapIOCounters.LARGE_READ_OPS).increment(largeReadOpsDelta); + tezCounters.findCounter(counterGroup, prefix + LlapIOCounters.WRITE_OPS).increment(writeOpsDelta); + } else { + LOG.warn("Not updating FS stats for LLAP IO. tezCounters: {}," + + " fsStatsBefore: {} fsStatsAfter: {}", tezCounters, fsStatsBefore, fsStatsAfter); + } + } + @Override public String toString() { // We rely on NDC information in the logs to map counters to attempt. @@ -135,4 +164,5 @@ public String toString() { sb.append(" ]"); return sb.toString(); } + } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 08c6f27..6eab32b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; @@ -430,12 +431,14 @@ public void onFailure(Throwable t) { } public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { + FragmentCountersMap.updateFileSystemCounters(fragmentFullId); return currentCallable.taskSucceeded(taskAttemptID); } @Override public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { + FragmentCountersMap.updateFileSystemCounters(fragmentFullId); return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index fb0867d..85a2115 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; -import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; @@ -140,6 +140,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final OrcEncodedDataConsumer consumer; private final QueryFragmentCounters counters; private final UserGroupInformation ugi; + private FileSystem.Statistics fsStatsBefore; // Read state. private int stripeIxFrom; @@ -304,7 +305,7 @@ protected Void performDataRead() throws IOException { try { ensureOrcReader(); // Reader creating updates HDFS counters, don't do it here. - DataWrapperForOrc dw = new DataWrapperForOrc(); + DataWrapperForOrc dw = new DataWrapperForOrc(split.getPath()); stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } catch (Throwable t) { @@ -801,13 +802,15 @@ public void determineStripesToRead() { private class DataWrapperForOrc implements DataReader, DataCache { private final DataReader orcDataReader; + private final Path path; - public DataWrapperForOrc() { + public DataWrapperForOrc(final Path path) { boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); if (useZeroCopy && !getAllocator().isDirectAlloc()) { throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache " + "buffers; either disable zero-copy or enable direct cache allocation"); } + this.path = path; this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy); } @@ -844,6 +847,9 @@ public Allocator getAllocator() { @Override public void close() throws IOException { orcDataReader.close(); + FileSystem.Statistics fsStatsAfter = LlapUtil + .cloneFileSystemStatistics(path.toUri().getScheme(), fs.getClass()); + counters.updateLlapIOFileSystemCounters(fsStatsBefore, fsStatsAfter); } @Override @@ -872,6 +878,8 @@ public void releaseBuffer(ByteBuffer buffer) { @Override public void open() throws IOException { long startTime = counters.startTimeCounter(); + fsStatsBefore = LlapUtil + .cloneFileSystemStatistics(path.toUri().getScheme(), fs.getClass()); orcDataReader.open(); counters.recordHdfsTime(startTime); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 7a7e2ac..64f8b02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -76,6 +76,7 @@ private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary"; private static final String TASK_SUMMARY_HEADER = "Task Execution Summary"; private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; + private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary"; // keep this within 80 chars width. If more columns needs to be added then update min terminal // width requirement and SEPARATOR width accordingly @@ -96,6 +97,12 @@ "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS", "ALLOCATION", "USED", "TOTAL_IO"); + // FileSystem counters + private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %15s %15s %16s %15s"; + private static final String FS_COUNTERS_HEADER = String.format(FS_COUNTERS_HEADER_FORMAT, + "VERTICES", "HDFS_READ [OPS]", "HDFS_WRITE [OPS]", "FILE_READ [OPS]", "FILE_WRITE [OPS]", + "LLAP_READ [OPS]"); + // Methods summary private static final String OPERATION_SUMMARY = "%-35s %9s"; private static final String OPERATION = "OPERATION"; @@ -373,6 +380,11 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, console.printInfo(LLAP_IO_SUMMARY_HEADER); printLlapIOSummary(progressMap, console, dagClient); console.printInfo(SEPARATOR); + console.printInfo(""); + + console.printInfo(FS_COUNTERS_SUMMARY_HEADER); + printFSCountersSummary(progressMap, console, dagClient); + console.printInfo(SEPARATOR); } console.printInfo("\n"); @@ -651,6 +663,64 @@ private void printLlapIOSummary(Map progressMap, LogHelper con } } + private void printFSCountersSummary(Map progressMap, LogHelper console, + DAGClient dagClient) { + SortedSet keys = new TreeSet<>(progressMap.keySet()); + Set statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + boolean first = false; + String counterGroup = LlapIOCounters.class.getName(); + for (String vertexName : keys) { + TezCounters vertexCounters = null; + try { + vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) + .getVertexCounters(); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } + if (vertexCounters != null) { + final long hdfsBytesRead = getCounterValueByGroupName(vertexCounters, + counterGroup, "HDFS_" + LlapIOCounters.BYTES_READ.name()); + final long hdfsReadOps = getCounterValueByGroupName(vertexCounters, + counterGroup, "HDFS_" + LlapIOCounters.READ_OPS.name()); + final long fileBytesRead = getCounterValueByGroupName(vertexCounters, + counterGroup, "FILE_" + LlapIOCounters.BYTES_READ.name()); + final long fileReadOps = getCounterValueByGroupName(vertexCounters, + counterGroup, "FILE_" + LlapIOCounters.READ_OPS.name()); + final long hdfsBytesWritten = getCounterValueByGroupName(vertexCounters, + counterGroup, "HDFS_" + LlapIOCounters.BYTES_WRITTEN.name()); + final long hdfsWriteOps = getCounterValueByGroupName(vertexCounters, + counterGroup, "HDFS_" + LlapIOCounters.WRITE_OPS.name()); + final long fileBytesWritten = getCounterValueByGroupName(vertexCounters, + counterGroup, "FILE_" + LlapIOCounters.BYTES_WRITTEN.name()); + final long fileWriteOps = getCounterValueByGroupName(vertexCounters, + counterGroup, "FILE_" + LlapIOCounters.WRITE_OPS.name()); + final long llapBytesRead = getCounterValueByGroupName(vertexCounters, + counterGroup, "LLAP_IO_HDFS_" + LlapIOCounters.BYTES_READ.name()); + final long llapReadOps = getCounterValueByGroupName(vertexCounters, + counterGroup, "LLAP_IO_HDFS_" + LlapIOCounters.READ_OPS.name()); + + if (!first) { + console.printInfo(SEPARATOR); + reprintLineWithColorAsBold(FS_COUNTERS_HEADER, Ansi.Color.CYAN); + console.printInfo(SEPARATOR); + first = true; + } + + String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT, + vertexName, + humanReadableByteCount(hdfsBytesRead) + " [" + hdfsReadOps + "]", + humanReadableByteCount(hdfsBytesWritten) + " [" + hdfsWriteOps + "]", + humanReadableByteCount(fileBytesRead) + " [" + fileReadOps + "]", + humanReadableByteCount(fileBytesWritten) + " [" + fileWriteOps + "]", + humanReadableByteCount(llapBytesRead) + " [" + llapReadOps + "]"); + console.printInfo(fsCountersSummary); + } + } + } + private void printStatusInPlace(Map progressMap, long startTime, boolean vextexStatusFromAM, DAGClient dagClient) { StringBuilder reportBuffer = new StringBuilder(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java index 81bda08..29ba6ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -63,6 +64,17 @@ public void run(HookContext hookContext) throws Exception { for (TezCounter counter : group) { console.printError(" " + counter.getDisplayName() + ": " + counter.getValue()); } + } else if (LlapIOCounters.class.getName().equals(group.getDisplayName())) { + console.printError(tezTask.getId() + " LLAP IO COUNTERS:"); + List testSafeCounters = LlapIOCounters.testSafeCounterNames(); + for (TezCounter counter : group) { + // for now print only llap io counters as other counters might be inconsistent + // causing unwanted test failures. This needs tez-0.8.3 to work properly. + if (counter.getDisplayName().contains(LlapIOCounters.LLAP_IO_PREFIX) || + testSafeCounters.contains(counter.getDisplayName())) { + console.printError(" " + counter.getDisplayName() + ": " + counter.getValue()); + } + } } } }