diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d94ff85..e131dc3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1804,6 +1804,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "in the number of rows filtered by a certain operator, which in turn might lead to overprovision or\n" + "underprovision of resources. This factor is applied to the cardinality estimation of IN clauses in\n" + "filter operators."), + HIVE_STATS_CACHE_RUNTIME_STATS("hive.stats.cache.runtime.stats", true, + "Collect and cache runtime statistics. The subsequent run of the same query will use the statistics collected\n" + + "from the previous run."), // Concurrency HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 2ddabd9..5af20e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -20,7 +20,11 @@ import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Base64; import java.util.Calendar; import java.util.Collection; import java.util.Collections; @@ -35,7 +39,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -61,6 +64,8 @@ import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; +import com.google.common.annotations.VisibleForTesting; + /** * QueryPlan can be serialized to disk so that we can restart/resume the * progress of it in the future, either within or outside of the current @@ -111,6 +116,8 @@ private final Set acidSinks; private Boolean autoCommitValue; + private transient MessageDigest md; + public QueryPlan() { this(null); } @@ -826,4 +833,16 @@ public HiveOperation getOperation() { public Boolean getAutoCommitValue() { return autoCommitValue; } + + public static String getQueryMD5(String queryString) throws NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("MD5"); + String query = normalizeQuery(queryString); + byte[] hash = md.digest(query.getBytes(StandardCharsets.UTF_8)); + String hashStr = Base64.getEncoder().encodeToString(hash); + return hashStr; + } + + public static String normalizeQuery(final String queryString) { + return queryString.replaceAll("\\s+", " ").replaceAll("`", "").trim().toLowerCase(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 73ddf86..c095d3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -83,7 +83,10 @@ protected final AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; - protected transient long runTimeNumRows; + protected final transient LongWritable runTimeRowsWritable = new LongWritable(); + protected final transient LongWritable recordCounter = new LongWritable(); + protected transient long numRows = 0; + protected transient long runTimeNumRows = 0; protected int indexForTezUnion = -1; private transient Configuration hconf; protected final transient Collection> asyncInitOperations = new HashSet<>(); @@ -107,6 +110,14 @@ // one of its parent is not in state CLOSE.. } + /** + * Counters. + */ + public enum Counter { + RECORDS_OUT_OPERATOR, + RECORDS_OUT_INTERMEDIATE + } + protected transient State state = State.UNINIT; private boolean useBucketizedHiveInputFormat; @@ -226,7 +237,6 @@ public RowSchema getSchema() { @SuppressWarnings("rawtypes") protected transient OutputCollector out; protected transient final Logger LOG = LoggerFactory.getLogger(getClass().getName()); - protected transient final Logger PLOG = LoggerFactory.getLogger(Operator.class.getName()); // for simple disabling logs from all operators protected transient String alias; protected transient Reporter reporter; protected String id; @@ -492,6 +502,16 @@ protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; rootInitializeCalled = true; runTimeNumRows = 0; + statsMap.put(Counter.RECORDS_OUT_OPERATOR.name() + "_" + getOperatorId(), runTimeRowsWritable); + statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter); + } + + public String getCounterName(Counter counter, Configuration hconf) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ", "_"); + } + return counter + context; } /** @@ -740,7 +760,8 @@ protected void closeOp(boolean abort) throws HiveException { if (conf != null && conf.getRuntimeStatsTmpDir() != null) { publishRunTimeStats(); } - runTimeNumRows = 0; + runTimeRowsWritable.set(runTimeNumRows); + recordCounter.set(numRows); } private boolean jobCloseDone = false; @@ -966,12 +987,6 @@ private void baseForward(Object row, ObjectInspector rowInspector) } } - public void resetStats() { - for (String e : statsMap.keySet()) { - statsMap.get(e).set(0L); - } - } - public void reset(){ this.state=State.INIT; if (childOperators != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 92741ee..0c6af2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -62,13 +62,6 @@ public class ReduceSinkOperator extends TerminalOperator implements Serializable, TopNHash.BinaryCollector { - /** - * Counters. - */ - public static enum Counter { - RECORDS_OUT_INTERMEDIATE - } - private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -140,10 +133,8 @@ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? protected transient Object[][] cachedKeys; - protected transient long numRows = 0; protected transient long cntr = 1; protected transient long logEveryNRows = 0; - private final transient LongWritable recordCounter = new LongWritable(); /** Kryo ctor. */ protected ReduceSinkOperator() { @@ -163,8 +154,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter); - List keys = conf.getKeyCols(); if (LOG.isDebugEnabled()) { @@ -248,15 +237,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { } } - public String getCounterName(Counter counter, Configuration hconf) { - String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - if (context != null && !context.isEmpty()) { - context = "_" + context.replace(" ", "_"); - } - return counter + context; - } - - /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct * column indices for group by. @@ -539,6 +519,7 @@ protected void closeOp(boolean abort) throws HiveException { if (!abort && reducerHash != null) { reducerHash.flush(); } + runTimeNumRows = numRows; super.closeOp(abort); out = null; random = null; @@ -546,7 +527,6 @@ protected void closeOp(boolean abort) throws HiveException { if (LOG.isTraceEnabled()) { LOG.info(toString() + ": records written - " + numRows); } - recordCounter.set(numRows); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 496af0b..9f0be1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -28,16 +28,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.apache.hadoop.hive.ql.exec.TopNHash; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -55,9 +52,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; @@ -134,10 +129,8 @@ // Where to write our key and value pairs. private transient OutputCollector out; - private transient long numRows = 0; private transient long cntr = 1; private transient long logEveryNRows = 0; - private final transient LongWritable recordCounter = new LongWritable(); // For debug tracing: the name of the map or reduce task. protected transient String taskName; @@ -310,7 +303,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { if (context != null && !context.isEmpty()) { context = "_" + context.replace(" ","_"); } - statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); reduceSkipTag = conf.getSkipTag(); reduceTagByte = (byte) conf.getTag(); @@ -435,13 +427,13 @@ protected void closeOp(boolean abort) throws HiveException { if (!abort && reducerHash != null) { reducerHash.flush(); } + runTimeNumRows = numRows; super.closeOp(abort); out = null; reducerHash = null; if (LOG.isInfoEnabled()) { LOG.info(toString() + ": records written - " + numRows); } - recordCounter.set(numRows); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java index bb7d677..c125b8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java @@ -18,53 +18,15 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 15581ae..c0ae1bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -18,53 +18,25 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java index 3acae94..2e20de4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java @@ -18,52 +18,17 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/TezRuntimeStatisticsHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/TezRuntimeStatisticsHook.java new file mode 100644 index 0000000..8b6bacf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/TezRuntimeStatisticsHook.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Serializable; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.RuntimeStatisticsCache; +import org.apache.hadoop.hive.ql.plan.OperatorStats; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Post execution hook to cache runtime statistics from Tez vertices for a given query. + */ +public class TezRuntimeStatisticsHook implements ExecuteWithHookContext { + private static final Logger LOG = LoggerFactory.getLogger(TezRuntimeStatisticsHook.class.getName()); + + @Override + public void run(HookContext hookContext) throws Exception { + // don't run this hook for explain queries + if (hookContext.getQueryPlan().isExplain()) { + return; + } + switch (hookContext.getHookType()) { + case POST_EXEC_HOOK: + case ON_FAILURE_HOOK: + updateRuntimeStats(hookContext); + break; + default: + break; + } + } + + private void updateRuntimeStats(final HookContext hookContext) throws NoSuchAlgorithmException { + HiveConf conf = hookContext.getConf(); + if (!"tez".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return; + } + + LOG.info("Executing runtime statistics post execution hook for tez.."); + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + + String queryMD5 = QueryPlan.getQueryMD5(plan.getQueryString()); + LOG.info("MD5: {} for query: {}", queryMD5, QueryPlan.normalizeQuery(plan.getQueryString())); + Map opStats = RuntimeStatisticsCache.RUNTIME_STATS_CACHE.getIfPresent(queryMD5); + if (opStats == null) { + opStats = new HashMap<>(); + } + List rootTasks = Utilities.getTezTasks(plan.getRootTasks()); + for (TezTask tezTask : rootTasks) { + List baseWorks = tezTask.getWork().getAllWork(); + for (BaseWork baseWork : baseWorks) { + String vertexName = baseWork.getName(); + LOG.info("Updating runtime statistics for tez task: {}", vertexName); + TezCounters counters = tezTask.getTezCounters(); + if (counters != null) { + String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + for (Operator op : baseWork.getAllOperators()) { + String operatorId = op.getOperatorId(); + OperatorStats operatorStats = null; + String counterName = Operator.Counter.RECORDS_OUT_OPERATOR.toString() + "_" + operatorId; + TezCounter tezCounter = counters.getGroup(groupName).findCounter(counterName, false); + if (tezCounter != null) { + if (operatorStats == null) { + operatorStats = new OperatorStats(operatorId); + } + operatorStats.setOutputRecords(tezCounter.getValue()); + } + + if (operatorStats != null) { + opStats.put(operatorId, operatorStats); + } else { + LOG.warn("Unable to get statistics for vertex: {} opId: {} groupName: {}", vertexName, operatorId, + groupName); + } + } + } + + // TODO: should it be uncompressed shuffle bytes? + String shuffleCounterName = TaskCounter.SHUFFLE_BYTES.toString(); + List> parentTasks = tezTask.getParentTasks(); + long shuffleBytesTotal = 0; + for (Task pTask : parentTasks) { + String currTaskName = vertexName.replaceAll("\\s+", "_"); + String pTaskName = pTask.getName().replaceAll("\\s+", "_"); + String groupName = TaskCounter.class.getSimpleName() + "_" + currTaskName + "_INPUT_" + pTaskName; + TezCounter tezCounter = counters.getGroup(groupName).findCounter(shuffleCounterName, false); + if (tezCounter != null) { + shuffleBytesTotal += tezCounter.getValue(); + } + } + } + } + + if (!opStats.isEmpty()) { + // TODO: look at the DAG status before updating (some killed tasks could end up with wrong stats). Only update + // when the query SUCCEEDED or FAILED + LOG.info("Updating runtime stats: {} for queryMD5: {}", opStats, queryMD5); + RuntimeStatisticsCache.RUNTIME_STATS_CACHE.put(queryMD5, opStats); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 13d7730..225b887 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -4306,6 +4306,11 @@ private static VectorPTFInfo createVectorPTFInfo(Operator stack, NodeProcessorCtx procCtx, try { // gather statistics for the first time and the attach it to table scan operator Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop); - tsop.setStatistics(stats.clone()); + Statistics outStats = stats.clone(); + // check if runtime stats is available + if(aspCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_CACHE_RUNTIME_STATS)) { + ParseContext parseContext = aspCtx.getParseContext(); + String queryString = parseContext.getContext().getCmd(); + // if this is explain plan, we need to get the underlying query so that we can show the runtime statistics + // in explain. TODO: this is required because of byte-to-byte query string match + if (parseContext.getContext().isExplainPlan()) { + queryString = parseContext.getContext().getExplainConfig().getQuery(); + } + String queryMD5 = null; + try { + queryMD5 = QueryPlan.getQueryMD5(queryString); + LOG.info("Compilation. MD5: {} query: {}", queryMD5, QueryPlan.normalizeQuery(queryString)); + } catch (NoSuchAlgorithmException e) { + LOG.warn("Not checking for runtime stats as query MD5 threw exception.", e); + } + if (queryMD5 != null) { + Map opStats = RuntimeStatisticsCache.RUNTIME_STATS_CACHE.getIfPresent(queryMD5); + if (opStats != null) { + OperatorStats operatorStats = opStats.get(tsop.getOperatorId()); + if (operatorStats != null) { + long oldRowCount = outStats.getNumRows(); + long oldDataSize = outStats.getDataSize(); + long newRowCount = operatorStats.getOutputRecords(); + outStats.setNumRows(newRowCount); + long newDataSize = StatsUtils.getMaxIfOverflow(StatsUtils.getDataSizeFromColumnStats(outStats + .getNumRows(), outStats.getColumnStats())); + outStats.setDataSize(newDataSize); + outStats.setRuntimeStats(true); + LOG.info("Runtime statistics available for op: {} updated rowCount: {} -> {} dataSize: {} -> {}", + tsop.getOperatorId(), oldRowCount, newRowCount, oldDataSize, newDataSize); + } + } + } + } + + tsop.setStatistics(outStats); if (LOG.isDebugEnabled()) { LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " + @@ -2338,6 +2380,42 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, outStats.setColumnStats(colStats); } + + // check if runtime stats is available + if(aspCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_CACHE_RUNTIME_STATS)) { + ParseContext parseContext = aspCtx.getParseContext(); + String queryString = parseContext.getContext().getCmd(); + // if this is explain plan, we need to get the underlying query so that we can show the runtime statistics + // in explain. TODO: this is required because of byte-to-byte query string match + if (parseContext.getContext().isExplainPlan()) { + queryString = parseContext.getContext().getExplainConfig().getQuery(); + } + String queryMD5 = null; + try { + queryMD5 = QueryPlan.getQueryMD5(queryString); + LOG.info("Compilation. MD5: {} query: {}", queryMD5, QueryPlan.normalizeQuery(queryString)); + } catch (NoSuchAlgorithmException e) { + LOG.warn("Not checking for runtime stats as query MD5 threw exception.", e); + } + if (queryMD5 != null) { + Map opStats = RuntimeStatisticsCache.RUNTIME_STATS_CACHE.getIfPresent(queryMD5); + if (opStats != null) { + OperatorStats operatorStats = opStats.get(rop.getOperatorId()); + if (operatorStats != null) { + long oldRowCount = outStats.getNumRows(); + long oldDataSize = outStats.getDataSize(); + long newRowCount = operatorStats.getOutputRecords(); + outStats.setNumRows(newRowCount); + long newDataSize = StatsUtils.getMaxIfOverflow(StatsUtils.getDataSizeFromColumnStats(outStats + .getNumRows(), outStats.getColumnStats())); + outStats.setDataSize(newDataSize); + outStats.setRuntimeStats(true); + LOG.info("Runtime statistics available for op: {} updated rowCount: {} -> {} dataSize: {} -> {}", + rop.getOperatorId(), oldRowCount, newRowCount, oldDataSize, newDataSize); + } + } + } + } rop.setStatistics(outStats); if (LOG.isDebugEnabled()) { LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java index 1f118dc..2b59a06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java @@ -47,6 +47,7 @@ private boolean vectorization = false; private boolean vectorizationOnly = false; private VectorizationDetailLevel vectorizationDetailLevel = VectorizationDetailLevel.SUMMARY; + private String query; private Path explainRootPath; private Map opIdToRuntimeNumRows; @@ -153,4 +154,11 @@ public void setOpIdToRuntimeNumRows(Map opIdToRuntimeNumRows) { this.opIdToRuntimeNumRows = opIdToRuntimeNumRows; } + public String getQuery() { + return query; + } + + public void setQuery(final String query) { + this.query = query; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 7a0d4a7..17d1d44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -128,12 +128,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { ctx.setExplainPlan(true); ASTNode input = (ASTNode) ast.getChild(0); + + // This is the actual query without the explain keyword + options + String query = ctx.getTokenRewriteStream().toString(input.getTokenStartIndex(), + input.getTokenStopIndex()); + config.setQuery(query); + // explain analyze is composed of two steps // step 1 (ANALYZE_STATE.RUNNING), run the query and collect the runtime #rows // step 2 (ANALYZE_STATE.ANALYZING), explain the query and provide the runtime #rows collected. if (config.getAnalyze() == AnalyzeState.RUNNING) { - String query = ctx.getTokenRewriteStream().toString(input.getTokenStartIndex(), - input.getTokenStopIndex()); LOG.info("Explain analyze (running phase) for query " + query); Context runCtx = null; try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 15836ec..cfc9337 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.TezRuntimeStatisticsHook; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -112,6 +113,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.Sets; + /** * TezCompiler translates the operator plan into TezTasks. */ @@ -410,6 +416,23 @@ private void connect(Operator o, AtomicInteger index, Stack> node } private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException { + if(procCtx.conf.getBoolVar(ConfVars.HIVE_STATS_CACHE_RUNTIME_STATS)) { + Set postHooks = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split( + Strings.nullToEmpty(HiveConf.getVar(procCtx.conf, ConfVars.POSTEXECHOOKS)))); + if (!postHooks.contains(TezRuntimeStatisticsHook.class.getName())) { + postHooks.add(TezRuntimeStatisticsHook.class.getName()); + String updatedHooks = Joiner.on(",").join(postHooks); + procCtx.conf.setVar(ConfVars.POSTEXECHOOKS, updatedHooks); + } + + Set failureHooks = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split( + Strings.nullToEmpty(HiveConf.getVar(procCtx.conf, ConfVars.ONFAILUREHOOKS)))); + if (!failureHooks.contains(TezRuntimeStatisticsHook.class.getName())) { + failureHooks.add(TezRuntimeStatisticsHook.class.getName()); + String updatedHooks = Joiner.on(",").join(failureHooks); + procCtx.conf.setVar(ConfVars.ONFAILUREHOOKS, updatedHooks); + } + } new AnnotateWithStatistics().transform(procCtx.parseContext); new AnnotateWithOpTraits().transform(procCtx.parseContext); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorStats.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorStats.java new file mode 100644 index 0000000..62a869b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorStats.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +/** + * + */ +public class OperatorStats { + private String operatorId; + private long inputRecords; + private long outputRecords; + private long inputDataSize; + private long outputDataSize; + + public OperatorStats(final String opId) { + this.operatorId = opId; + this.inputRecords = -1; + this.inputDataSize = -1; + this.outputRecords = -1; + this.outputDataSize = -1; + } + + public long getInputRecords() { + return inputRecords; + } + + public void setInputRecords(final long inputRecords) { + this.inputRecords = inputRecords; + } + + public long getOutputRecords() { + return outputRecords; + } + + public void setOutputRecords(final long outputRecords) { + this.outputRecords = outputRecords; + } + + public long getInputDataSize() { + return inputDataSize; + } + + public void setInputDataSize(final long inputDataSize) { + this.inputDataSize = inputDataSize; + } + + public long getOutputDataSize() { + return outputDataSize; + } + + public void setOutputDataSize(final long outputDataSize) { + this.outputDataSize = outputDataSize; + } + + public String getOperatorId() { + return operatorId; + } + + public void setOperatorId(final String operatorId) { + this.operatorId = operatorId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append(" operatorId: ").append(operatorId); + if (inputRecords >= 0) { + sb.append(" inputRecords: ").append(inputRecords); + } + if (inputDataSize >= 0) { + sb.append(" inputDataSize: ").append(inputDataSize); + } + if (outputRecords >= 0) { + sb.append(" outputRecords: ").append(outputRecords); + } + if (outputDataSize >= 0) { + sb.append(" outputDataSize: ").append(outputDataSize); + } + sb.append(" }"); + return sb.toString(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RuntimeStatisticsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RuntimeStatisticsCache.java new file mode 100644 index 0000000..ba96035 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RuntimeStatisticsCache.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import java.util.Map; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.hive.ql.plan.OperatorStats; + +/** + * + */ +// TODO make this generic +public class RuntimeStatisticsCache { + public static Cache> RUNTIME_STATS_CACHE = CacheBuilder.newBuilder() + .recordStats().maximumSize(1000).build(); + + public String getCacheStatsString() { + return RUNTIME_STATS_CACHE.stats().toString(); + } + + @Override + public String toString() { + return RUNTIME_STATS_CACHE.toString(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java index 8ffb4ce..d6cd78d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java @@ -44,6 +44,7 @@ private State basicStatsState; private Map columnStats; private State columnStatsState; + private boolean runtimeStats; public Statistics() { this(0, 0, -1); @@ -106,6 +107,9 @@ public void setColumnStatsState(State columnStatsState) { @Explain(displayName = "Statistics") public String toString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append("(RUNTIME) "); + } sb.append("Num rows: "); sb.append(numRows); if (runTimeNumRows >= 0) { @@ -123,6 +127,9 @@ public String toString() { @Explain(displayName = "Statistics", explainLevels = { Level.USER }) public String toUserLevelExplainString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append("runtime: "); + } sb.append("rows="); sb.append(numRows); if (runTimeNumRows >= 0) { @@ -140,6 +147,9 @@ public String toUserLevelExplainString() { public String extendedToString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append(" (runtime) "); + } sb.append(" numRows: "); sb.append(numRows); sb.append(" dataSize: "); @@ -165,6 +175,8 @@ public Statistics clone() throws CloneNotSupportedException { } clone.setColumnStats(cloneColStats); } + // TODO: this boolean flag is set only by RS stats annotation at this point + //clone.setRuntimeStats(runtimeStats); return clone; } @@ -284,4 +296,12 @@ public long getRunTimeNumRows() { public void setRunTimeNumRows(long runTimeNumRows) { this.runTimeNumRows = runTimeNumRows; } + + public boolean isRuntimeStats() { + return runtimeStats; + } + + public void setRuntimeStats(final boolean runtimeStats) { + this.runtimeStats = runtimeStats; + } }