diff --git llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java new file mode 100644 index 0000000..bc7c2a8 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Per query counters. + */ +public class QueryFragmentCounters { + + public static enum Counter { + NUM_VECTOR_BATCHES, + NUM_DECODED_BATCHES, + SELECTED_ROWGROUPS, + NUM_ERRORS, + ROWS_EMITTED + } + + private String appId; + private Map counterMap; + + public QueryFragmentCounters() { + this("Not Specified"); + } + + public QueryFragmentCounters(String applicationId) { + this.appId = applicationId; + this.counterMap = new ConcurrentHashMap<>(); + } + + public void incrCounter(Counter counter) { + incrCounter(counter, 1); + } + + public void incrCounter(Counter counter, long delta) { + if (counterMap.containsKey(counter.name())) { + long val = counterMap.get(counter.name()); + counterMap.put(counter.name(), val + delta); + } else { + setCounter(counter, delta); + } + } + + public void setCounter(Counter counter, long value) { + counterMap.put(counter.name(), value); + } + + @Override + public String toString() { + return "ApplicationId: " + appId + " Counters: " + counterMap; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index fca9907..e9d3443 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -167,7 +167,6 @@ public void queueContainer(RunContainerRequestProto request) throws IOException LOG.info("DEBUG: Registering request with the ShuffleHandler"); ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); - ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, workingDir, credentials, memoryPerExecutor); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 98ef3b0..f4c3cb2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -110,12 +111,14 @@ /** Vector that is currently being processed by our user. */ private boolean isDone = false, isClosed = false; private ConsumerFeedback feedback; + private final QueryFragmentCounters counters; public LlapRecordReader(JobConf job, FileSplit split, List includedCols) { this.split = split; this.columnIds = includedCols; this.sarg = SearchArgumentFactory.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); + this.counters = new QueryFragmentCounters(); try { rbCtx = new VectorizedRowBatchCtx(); rbCtx.init(job, split); @@ -175,7 +178,7 @@ public void onFailure(Throwable t) { private void startRead() { // Create the consumer of encoded data; it will coordinate decoding to CVBs. - ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames); + ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters); feedback = rp; ListenableFuture future = executor.submit(rp.getReadCallable()); // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases. @@ -235,6 +238,7 @@ public void close() throws IOException { LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); } + LlapIoImpl.LOG.info("QueryFragmentCounters: " + counters); feedback.stop(); rethrowErrorIfAny(); } @@ -253,6 +257,7 @@ public void setDone() { LlapIoImpl.LOG.info("setDone called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); } + LlapIoImpl.LOG.info("DONE: QueryFragmentCounters: " + counters); synchronized (pendingData) { isDone = true; pendingData.notifyAll(); @@ -276,9 +281,11 @@ public void consumeData(ColumnVectorBatch data) { @Override public void setError(Throwable t) { + counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS); LlapIoImpl.LOG.info("setError called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); assert t != null; + LlapIoImpl.LOG.info("ERROR: QueryFragmentCounters: " + counters); synchronized (pendingData) { pendingError = t; pendingData.notifyAll(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index c1f12d6..f561270 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.InputSplit; @@ -30,5 +31,6 @@ */ public interface ColumnVectorProducer { ReadPipeline createReadPipeline(Consumer consumer, InputSplit split, - List columnIds, SearchArgument sarg, String[] columnNames); + List columnIds, SearchArgument sarg, String[] columnNames, + QueryFragmentCounters counters); } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 56d3fe2..1f20f0c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -43,7 +44,7 @@ private final Configuration conf; private boolean _skipCorrupt; // TODO: get rid of this private LlapDaemonCacheMetrics metrics; - + public OrcColumnVectorProducer(OrcMetadataCache metadataCache, LowLevelCacheImpl lowLevelCache, Cache cache, Configuration conf, LlapDaemonCacheMetrics metrics) { @@ -62,12 +63,13 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, @Override public ReadPipeline createReadPipeline( Consumer consumer, InputSplit split, - List columnIds, SearchArgument sarg, String[] columnNames) { + List columnIds, SearchArgument sarg, String[] columnNames, + QueryFragmentCounters counters) { metrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( - consumer, columnIds.size(), _skipCorrupt); - OrcEncodedDataReader reader = new OrcEncodedDataReader( - lowLevelCache, cache, metadataCache, conf, split, columnIds, sarg, columnNames, edc); + OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), + _skipCorrupt, counters); + OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache, + conf, split, columnIds, sarg, columnNames, edc, counters); edc.init(reader, reader); return edc; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index b4c0fe3..72cbc43 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -20,9 +20,9 @@ import java.io.IOException; import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader; @@ -52,11 +52,14 @@ private OrcFileMetadata fileMetadata; // We assume one request is only for one file. private OrcStripeMetadata[] stripes; private final boolean skipCorrupt; // TODO: get rid of this + private final QueryFragmentCounters counters; public OrcEncodedDataConsumer( - Consumer consumer, int colCount, boolean skipCorrupt) { + Consumer consumer, int colCount, boolean skipCorrupt, + QueryFragmentCounters counters) { super(consumer, colCount); this.skipCorrupt = skipCorrupt; + this.counters = counters; } public void setFileMetadata(OrcFileMetadata f) { @@ -109,7 +112,10 @@ protected void decodeBatch(EncodedColumnBatch batch, // we are done reading a batch, send it to consumer for processing downstreamConsumer.consumeData(cvb); + counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize); } + counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG); + counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES); } catch (IOException e) { // Caller will return the batch. downstreamConsumer.setError(e); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 2cc6614..a2d8b6f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -8,13 +8,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.Cache; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; @@ -30,12 +30,12 @@ import org.apache.hadoop.hive.ql.io.orc.MetadataReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; import org.apache.hadoop.hive.ql.io.orc.StripeInformation; -import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -52,7 +52,7 @@ private final SearchArgument sarg; private final String[] columnNames; private final OrcEncodedDataConsumer consumer; - + private final QueryFragmentCounters counters; // Read state. private int stripeIxFrom; @@ -70,7 +70,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache cache, OrcMetadataCache metadataCache, Configuration conf, InputSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - OrcEncodedDataConsumer consumer) { + OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.cache = cache; @@ -83,6 +83,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache cach this.sarg = sarg; this.columnNames = columnNames; this.consumer = consumer; + this.counters = counters; } @Override @@ -459,6 +460,17 @@ private void determineRgsToRead(List stripes, List type readState[stripeIxMod][j] = (rgsToRead == null) ? null : Arrays.copyOf(rgsToRead, rgsToRead.length); } + + int count = 0; + if (rgsToRead != null) { + for (boolean b : rgsToRead) { + if (b) + count++; + } + } else { + count = rgCount; + } + counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); } }