diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java new file mode 100644 index 0000000..abff33c --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + +/** + * LLAP IO related counters. + */ +public enum LlapIOCounters { + NUM_VECTOR_BATCHES, + NUM_DECODED_BATCHES, + SELECTED_ROWGROUPS, + NUM_ERRORS, + ROWS_EMITTED, + METADATA_CACHE_HIT, + METADATA_CACHE_MISS, + CACHE_HIT_BYTES, + CACHE_MISS_BYTES, + ALLOCATED_BYTES, + ALLOCATED_USED_BYTES, + TOTAL_IO_TIME_US, + DECODE_TIME_US, + HDFS_TIME_US, + CONSUMER_TIME_US +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java new file mode 100644 index 0000000..383b65f --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains references to tez counters + */ +public class FragmentCountersMap { + private static final Logger LOG = LoggerFactory.getLogger(FragmentCountersMap.class); + private static final ConcurrentMap perFragmentCounters = new ConcurrentHashMap<>(); + + public static void registerCountersForFragment(String identifier, TezCounters tezCounters) { + if (perFragmentCounters.putIfAbsent(identifier, tezCounters) != null) { + LOG.warn("Not registering duplicate counters for fragment with tez identifier string=" + + identifier); + } + } + + public static TezCounters getCountersForFragment(String identifier) { + return perFragmentCounters.get(identifier); + } + + public static void unregisterCountersForFragment(String identifier) { + perFragmentCounters.remove(identifier); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index 5d16f72..cd54539 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters; +import org.apache.tez.common.counters.TezCounters; /** * Per query counters. @@ -30,24 +31,6 @@ public class QueryFragmentCounters implements LowLevelCacheCounters { private final boolean doUseTimeCounters; - public static enum Counter { - NUM_VECTOR_BATCHES, - NUM_DECODED_BATCHES, - SELECTED_ROWGROUPS, - NUM_ERRORS, - ROWS_EMITTED, - METADATA_CACHE_HIT, - METADATA_CACHE_MISS, - CACHE_HIT_BYTES, - CACHE_MISS_BYTES, - ALLOCATED_BYTES, - ALLOCATED_USED_BYTES, - TOTAL_IO_TIME_US, - DECODE_TIME_US, - HDFS_TIME_US, - CONSUMER_TIME_US - } - public static enum Desc { MACHINE, TABLE, @@ -57,25 +40,30 @@ private final AtomicLongArray fixedCounters; private final Object[] descs; + private final TezCounters tezCounters; - public QueryFragmentCounters(Configuration conf) { - fixedCounters = new AtomicLongArray(Counter.values().length); + public QueryFragmentCounters(Configuration conf, final TezCounters tezCounters) { + fixedCounters = new AtomicLongArray(LlapIOCounters.values().length); descs = new Object[Desc.values().length]; doUseTimeCounters = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_ENABLE_TIME_COUNTERS); + this.tezCounters = tezCounters; if (!doUseTimeCounters) { - setCounter(Counter.TOTAL_IO_TIME_US, -1); - setCounter(Counter.DECODE_TIME_US, -1); - setCounter(Counter.HDFS_TIME_US, -1); - setCounter(Counter.CONSUMER_TIME_US, -1); + setCounter(LlapIOCounters.TOTAL_IO_TIME_US, -1); + setCounter(LlapIOCounters.DECODE_TIME_US, -1); + setCounter(LlapIOCounters.HDFS_TIME_US, -1); + setCounter(LlapIOCounters.CONSUMER_TIME_US, -1); } } - public void incrCounter(Counter counter) { + public void incrCounter(LlapIOCounters counter) { incrCounter(counter, 1); } - public void incrCounter(Counter counter, long delta) { + public void incrCounter(LlapIOCounters counter, long delta) { fixedCounters.addAndGet(counter.ordinal(), delta); + if (tezCounters != null) { + tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).increment(delta); + } } @Override @@ -83,13 +71,20 @@ public final long startTimeCounter() { return (doUseTimeCounters ? System.nanoTime() : 0); } - public void incrTimeCounter(Counter counter, long startTime) { + public void incrTimeCounter(LlapIOCounters counter, long startTime) { if (!doUseTimeCounters) return; - fixedCounters.addAndGet(counter.ordinal(), System.nanoTime() - startTime); + long delta = System.nanoTime() - startTime; + fixedCounters.addAndGet(counter.ordinal(), delta); + if (tezCounters != null) { + tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).increment(delta); + } } - public void setCounter(Counter counter, long value) { + public void setCounter(LlapIOCounters counter, long value) { fixedCounters.set(counter.ordinal(), value); + if (tezCounters != null) { + tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).setValue(value); + } } public void setDesc(Desc key, Object desc) { @@ -98,23 +93,23 @@ public void setDesc(Desc key, Object desc) { @Override public void recordCacheHit(long bytesHit) { - incrCounter(Counter.CACHE_HIT_BYTES, bytesHit); + incrCounter(LlapIOCounters.CACHE_HIT_BYTES, bytesHit); } @Override public void recordCacheMiss(long bytesMissed) { - incrCounter(Counter.CACHE_MISS_BYTES, bytesMissed); + incrCounter(LlapIOCounters.CACHE_MISS_BYTES, bytesMissed); } @Override public void recordAllocBytes(long bytesUsed, long bytesAllocated) { - incrCounter(Counter.ALLOCATED_USED_BYTES, bytesUsed); - incrCounter(Counter.ALLOCATED_BYTES, bytesAllocated); + incrCounter(LlapIOCounters.ALLOCATED_USED_BYTES, bytesUsed); + incrCounter(LlapIOCounters.ALLOCATED_BYTES, bytesAllocated); } @Override public void recordHdfsTime(long startTime) { - incrTimeCounter(Counter.HDFS_TIME_US, startTime); + incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTime); } @Override @@ -135,7 +130,7 @@ public String toString() { if (i != 0) { sb.append(", "); } - sb.append(Counter.values()[i].name()).append("=").append(fixedCounters.get(i)); + sb.append(LlapIOCounters.values()[i].name()).append("=").append(fixedCounters.get(i)); } sb.append(" ]"); return sb.toString(); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index bb9f341..08c6f27 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; @@ -43,11 +44,11 @@ import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; -import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.task.ErrorReporter; import org.slf4j.Logger; @@ -71,13 +72,13 @@ public class LlapTaskReporter implements TaskReporterInterface { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskReporter.class); - private final LlapTaskUmbilicalProtocol umbilical; private final long pollInterval; private final long sendCounterInterval; private final int maxEventsToGet; private final AtomicLong requestCounter; private final String containerIdStr; + private final String fragmentFullId; private final ListeningExecutorService heartbeatExecutor; @@ -85,13 +86,15 @@ HeartbeatCallable currentCallable; public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval, - long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) { + long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, + String containerIdStr, final String fragFullId) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; this.maxEventsToGet = maxEventsToGet; this.requestCounter = requestCounter; this.containerIdStr = containerIdStr; + this.fragmentFullId = fragFullId; ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); heartbeatExecutor = MoreExecutors.listeningDecorator(executor); @@ -103,6 +106,9 @@ public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval @Override public synchronized void registerTask(RuntimeTask task, ErrorReporter errorReporter) { + TezCounters tezCounters = task.addAndGetTezCounter(fragmentFullId); + FragmentCountersMap.registerCountersForFragment(fragmentFullId, tezCounters); + LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentFullId, task.getVertexName()); currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr); ListenableFuture future = heartbeatExecutor.submit(currentCallable); @@ -115,6 +121,8 @@ public synchronized void registerTask(RuntimeTask task, */ @Override public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { + LOG.info("Unregistered counters for fragment: {}", fragmentFullId); + FragmentCountersMap.unregisterCountersForFragment(fragmentFullId); currentCallable.markComplete(); currentCallable = null; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index d88d82a..a1cfbb8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -51,6 +51,10 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -64,6 +68,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -201,13 +206,20 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } }); + TezTaskAttemptID taskAttemptID = taskSpec.getTaskAttemptID(); + TezTaskID taskId = taskAttemptID.getTaskID(); + TezVertexID tezVertexID = taskId.getVertexID(); + TezDAGID tezDAGID = tezVertexID.getDAGId(); + String fragFullId = Joiner.on('_').join(tezDAGID.getId(), tezVertexID.getId(), taskId.getId(), + taskAttemptID.getId()); taskReporter = new LlapTaskReporter( umbilical, confParams.amHeartbeatIntervalMsMax, confParams.amCounterHeartbeatInterval, confParams.amMaxEventsPerHeartbeat, new AtomicLong(0), - request.getContainerIdString()); + request.getContainerIdString(), + fragFullId); String attemptId = fragmentInfo.getFragmentIdentifierString(); IOContextMap.setThreadAttemptId(attemptId); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index a3d71c0..4ef5462 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -27,15 +27,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; -import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; @@ -53,7 +53,12 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -120,6 +125,7 @@ private class LlapRecordReader implements RecordReader, Consumer { + private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); private final FileSplit split; private final List columnIds; private final SearchArgument sarg; @@ -147,7 +153,18 @@ public LlapRecordReader( this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - this.counters = new QueryFragmentCounters(job); + String dagId = job.get("tez.mapreduce.dag.index"); + String vertexId = job.get("tez.mapreduce.vertex.index"); + String taskId = job.get("tez.mapreduce.task.index"); + String taskAttemptId = job.get("tez.mapreduce.task.attempt.index"); + Preconditions.checkNotNull(dagId, "DAG index is not set"); + Preconditions.checkNotNull(vertexId, "Vertex index is not set"); + Preconditions.checkNotNull(taskId, "Task index is not set"); + Preconditions.checkNotNull(taskAttemptId, "Task attempt index is not set"); + String fullId = Joiner.on('_').join(dagId, vertexId, taskId, taskAttemptId); + TezCounters taskCounters = FragmentCountersMap.getCountersForFragment(fullId); + LOG.info("Received dagid_vertexid_taskid_attempid: {}", fullId); + this.counters = new QueryFragmentCounters(job, taskCounters); this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName); MapWork mapWork = Utilities.getMapWork(job); @@ -192,7 +209,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti if (wasFirst) { firstReturnTime = counters.startTimeCounter(); } - counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime); + counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_US, firstReturnTime); return false; } if (columnIds.size() != cvb.cols.length) { @@ -330,7 +347,7 @@ public void consumeData(ColumnVectorBatch data) { @Override public void setError(Throwable t) { - counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS); + counters.incrCounter(LlapIOCounters.NUM_ERRORS); LlapIoImpl.LOG.info("setError called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); assert t != null; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 2597848..b2063fe 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; @@ -118,11 +119,11 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, // we are done reading a batch, send it to consumer for processing downstreamConsumer.consumeData(cvb); - counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize); + counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize); } - counters.incrTimeCounter(QueryFragmentCounters.Counter.DECODE_TIME_US, startTime); - counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG); - counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES); + counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_US, startTime); + counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG); + counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES); } catch (IOException e) { // Caller will return the batch. downstreamConsumer.setError(e); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index b36cf64..28b5853 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -47,7 +48,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; -import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; @@ -391,12 +391,12 @@ protected Void performDataRead() throws IOException { } isFoundInCache = (stripeMetadata != null); if (!isFoundInCache) { - counters.incrCounter(Counter.METADATA_CACHE_MISS); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); ensureMetadataReader(); long startTimeHdfs = counters.startTimeCounter(); stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0), metadataReader, stripe, stripeIncludes, sargColumns); - counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTimeHdfs); if (hasFileId && metadataCache != null) { stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); if (DebugUtils.isTraceOrcEnabled()) { @@ -413,11 +413,11 @@ protected Void performDataRead() throws IOException { + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); } assert isFoundInCache; - counters.incrCounter(Counter.METADATA_CACHE_MISS); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); ensureMetadataReader(); updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns); } else if (isFoundInCache) { - counters.incrCounter(Counter.METADATA_CACHE_HIT); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); } } catch (Throwable t) { consumer.setError(t); @@ -462,7 +462,7 @@ protected Void performDataRead() throws IOException { } private void recordReaderTime(long startTime) { - counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime); + counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_US, startTime); } private static String getDbAndTableName(Path path) { @@ -571,7 +571,7 @@ private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata, if (stripeMetadata.hasAllIndexes(stripeIncludes)) return; long startTime = counters.startTimeCounter(); stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns); - counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTime); } } @@ -610,7 +610,7 @@ private void ensureOrcReader() throws IOException { long startTime = counters.startTimeCounter(); ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); orcReader = EncodedOrcFile.createReader(path, opts); - counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTime); } /** @@ -621,10 +621,10 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { if (fileId != null && metadataCache != null) { metadata = metadataCache.getFileMetadata(fileId); if (metadata != null) { - counters.incrCounter(Counter.METADATA_CACHE_HIT); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); return metadata; } else { - counters.incrCounter(Counter.METADATA_CACHE_MISS); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); } } ensureOrcReader(); @@ -651,14 +651,14 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { value = metadataCache.getStripeMetadata(stripeKey); } if (value == null || !value.hasAllIndexes(globalInc)) { - counters.incrCounter(Counter.METADATA_CACHE_MISS); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); ensureMetadataReader(); StripeInformation si = fileMetadata.getStripes().get(stripeIx); if (value == null) { long startTime = counters.startTimeCounter(); value = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0), metadataReader, si, globalInc, sargColumns); - counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTime); if (hasFileId && metadataCache != null) { value = metadataCache.putStripeMetadata(value); if (DebugUtils.isTraceOrcEnabled()) { @@ -676,7 +676,7 @@ private OrcFileMetadata getOrReadFileMetadata() throws IOException { updateLoadedIndexes(value, si, globalInc, sargColumns); } } else { - counters.incrCounter(Counter.METADATA_CACHE_HIT); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); } result.add(value); consumer.setStripeMetadata(value); @@ -689,7 +689,7 @@ private void ensureMetadataReader() throws IOException { if (metadataReader != null) return; long startTime = counters.startTimeCounter(); metadataReader = orcReader.metadata(); - counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_US, startTime); } @Override @@ -772,7 +772,7 @@ private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone, } else if (!isNone) { count = rgCount; } - counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); + counters.setCounter(LlapIOCounters.SELECTED_ROWGROUPS, count); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index c8d135e..050ad25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -26,7 +26,6 @@ import java.io.PrintStream; import java.text.DecimalFormat; import java.text.NumberFormat; -import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; @@ -38,6 +37,7 @@ import java.util.TreeSet; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -72,9 +72,13 @@ private static final int COLUMN_1_WIDTH = 16; private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH; + private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); + private static final String PREP_SUMMARY_HEADER = "DAG Preparation Summary"; + private static final String TASK_SUMMARY_HEADER = "Task Execution Summary"; + private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; // keep this within 80 chars width. If more columns needs to be added then update min terminal - // width requirement and separator width accordingly + // width requirement and SEPARATOR width accordingly private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s "; private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s "; private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; @@ -82,12 +86,15 @@ "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"); // method and dag summary format - private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-12s %-19s %-19s %-15s %-15s %-15s"; - private static final String SUMMARY_VERTEX_FORMAT = "%-16s %11s %16s %12s %16s %18s %18s %14s %16s"; + private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s"; private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT, - "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION_SECONDS", - "CPU_TIME_MILLIS", "GC_TIME_MILLIS", "INPUT_RECORDS", "OUTPUT_RECORDS"); + "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS"); + // LLAP counters + private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %13s %14s %9s %10s %11s %11s"; + private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT, + "VERTICES", "ROWGROUPS", "METADATA_HIT", "METADATA_MISS", "DATA_HIT", "DATA_MISS", + "ALLOCATION", "ALLOC_USED"); private static final String TOTAL_PREP_TIME = "TotalPrepTime"; private static final String METHOD = "METHOD"; private static final String DURATION = "DURATION(ms)"; @@ -95,7 +102,6 @@ // in-place progress update related variables private int lines; private final PrintStream out; - private String separator; private transient LogHelper console; private final PerfLogger perfLogger = SessionState.getPerfLogger(); @@ -142,10 +148,6 @@ public TezJobMonitor(Map workMap) { // all progress updates are written to info stream and log file. In-place updates can only be // done to info stream (console) out = console.getInfoStream(); - separator = ""; - for (int i = 0; i < SEPARATOR_WIDTH; i++) { - separator += "-"; - } } /** @@ -219,7 +221,8 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, Set opts = new HashSet(); long startTime = 0; boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || - Utilities.isPerfOrAboveLogging(conf); + Utilities.isPerfOrAboveLogging(conf); + boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED); boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf); synchronized(shutdownList) { @@ -285,8 +288,23 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, + String.format("%.2f seconds", duration)); console.printInfo("\n"); + console.printInfo(PREP_SUMMARY_HEADER); printMethodsSummary(); + console.printInfo(SEPARATOR); + console.printInfo(""); + + console.printInfo(TASK_SUMMARY_HEADER); printDagSummary(progressMap, console, dagClient, conf, dag); + console.printInfo(SEPARATOR); + console.printInfo(""); + + if (llapIoEnabled) { + console.printInfo(LLAP_IO_SUMMARY_HEADER); + printLlapIOSummary(progressMap, console, dagClient); + console.printInfo(SEPARATOR); + } + + console.printInfo("\n"); } running = false; done = true; @@ -408,7 +426,9 @@ private void printMethodsSummary() { /* Build the method summary header */ String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION); - console.printInfo(methodBreakdownHeader); + console.printInfo(SEPARATOR); + reprintLineWithColorAsBold(methodBreakdownHeader, Ansi.Color.CYAN); + console.printInfo(SEPARATOR); for (String method : perfLoggerReportMethods) { long duration = perfLogger.getDuration(method); @@ -423,7 +443,7 @@ private void printMethodsSummary() { totalInPrepTime = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT); - console.printInfo(String.format("%-30s %11s\n", TOTAL_PREP_TIME, commaFormat.format( + console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME, commaFormat.format( totalInPrepTime))); } @@ -448,18 +468,16 @@ private void printDagSummary(Map progressMap, LogHelper consol } /* Print the per Vertex summary */ - console.printInfo(SUMMARY_HEADER); + console.printInfo(SEPARATOR); + reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN); + console.printInfo(SEPARATOR); SortedSet keys = new TreeSet(progressMap.keySet()); Set statusOptions = new HashSet(1); statusOptions.add(StatusGetOpts.GET_COUNTERS); for (String vertexName : keys) { Progress progress = progressMap.get(vertexName); if (progress != null) { - final int totalTasks = progress.getTotalTaskCount(); - final int failedTaskAttempts = progress.getFailedTaskAttemptCount(); - final int killedTaskAttempts = progress.getKilledTaskAttemptCount(); - final double duration = - perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0; + final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName); VertexStatus vertexStatus = null; try { vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions); @@ -540,11 +558,8 @@ private void printDagSummary(Map progressMap, LogHelper consol + vertexName.replace(" ", "_")) + hiveOutputIntermediateRecords; - String vertexExecutionStats = String.format(SUMMARY_VERTEX_FORMAT, + String vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT, vertexName, - totalTasks, - failedTaskAttempts, - killedTaskAttempts, secondsFormat.format((duration)), commaFormat.format(cpuTimeMillis), commaFormat.format(gcTimeMillis), @@ -555,6 +570,68 @@ private void printDagSummary(Map progressMap, LogHelper consol } } + + private String humanReadableByteCount(long bytes) { + int unit = 1000; // use binary units instead? + if (bytes < unit) { + return bytes + "B"; + } + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String suffix = "KMGTPE".charAt(exp-1) + ""; + return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); + } + + private void printLlapIOSummary(Map progressMap, LogHelper console, + DAGClient dagClient) throws Exception { + SortedSet keys = new TreeSet<>(progressMap.keySet()); + Set statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + boolean first = false; + String counterGroup = LlapIOCounters.class.getName(); + for (String vertexName : keys) { + // Reducers do not benefit from LLAP IO so no point in printing + if (vertexName.startsWith("Reducer")) { + continue; + } + TezCounters vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) + .getVertexCounters(); + if (vertexCounters != null) { + final long selectedRowgroups = getCounterValueByGroupName(vertexCounters, + counterGroup, "SELECTED_ROWGROUPS"); + final long metadataCacheHit = getCounterValueByGroupName(vertexCounters, + counterGroup, "METADATA_CACHE_HIT"); + final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters, + counterGroup, "METADATA_CACHE_MISS"); + final long cacheHitBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, "CACHE_HIT_BYTES"); + final long cacheMissBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, "CACHE_MISS_BYTES"); + final long allocatedBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, "ALLOCATED_BYTES"); + final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, "ALLOCATED_USED_BYTES"); + + if (!first) { + console.printInfo(SEPARATOR); + reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN); + console.printInfo(SEPARATOR); + first = true; + } + + String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT, + vertexName, + selectedRowgroups, + metadataCacheHit, + metadataCacheMiss, + humanReadableByteCount(cacheHitBytes), + humanReadableByteCount(cacheMissBytes), + humanReadableByteCount(allocatedBytes), + humanReadableByteCount(allocatedUsedBytes)); + console.printInfo(queryFragmentStats); + } + } + } + private void printStatusInPlace(Map progressMap, long startTime, boolean vextexStatusFromAM, DAGClient dagClient) { StringBuilder reportBuffer = new StringBuilder(); @@ -568,9 +645,9 @@ private void printStatusInPlace(Map progressMap, long startTim // ------------------------------------------------------------------------------- // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED // ------------------------------------------------------------------------------- - reprintLine(separator); + reprintLine(SEPARATOR); reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); - reprintLine(separator); + reprintLine(SEPARATOR); SortedSet keys = new TreeSet(progressMap.keySet()); int idx = 0; @@ -663,11 +740,11 @@ private void printStatusInPlace(Map progressMap, long startTim // ------------------------------------------------------------------------------- // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s // ------------------------------------------------------------------------------- - reprintLine(separator); + reprintLine(SEPARATOR); final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal; String footer = getFooter(keys.size(), completed.size(), progress, startTime); reprintLineWithColorAsBold(footer, Ansi.Color.RED); - reprintLine(separator); + reprintLine(SEPARATOR); } private String getMode(String name, Map workMap) {