diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e540d023bd..de1c56ef6c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4257,10 +4257,19 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "comma separated list of plugin can be used:\n" + " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n" + " reoptimize: collects operator statistics during execution and recompile the query after a failure"), + HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "query", + new StringSet("query", "hiveserver", "metastore"), + "Sets the persistence scope of runtime statistics\n" + + " query: runtime statistics are only used during re-execution\n" + + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"), + HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1, "Maximum number of re-executions for a single query."), HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false, - "Used during testing"), + "If sessionstats are enabled; this option can be used to collect statistics all the time"), + HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000, + "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"), + HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 48d62a8bf9..2b92a5f6ad 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -512,6 +512,7 @@ minillaplocal.query.files=\ retry_failure.q,\ retry_failure_stat_changes.q,\ retry_failure_oom.q,\ + runtime_stats_hs2.q,\ bucketsortoptimize_insert_2.q,\ check_constraint.q,\ cbo_gby.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9ca8b0007e..70846ac3ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -159,7 +159,7 @@ private boolean isExplainPlan = false; private PlanMapper planMapper = new PlanMapper(); - private StatsSource runtimeStatsSource; + private StatsSource statsSource; private int executionIndex; public void setOperation(Operation operation) { @@ -1047,16 +1047,16 @@ public PlanMapper getPlanMapper() { return planMapper; } - public void setStatsSource(StatsSource runtimeStatsSource) { - this.runtimeStatsSource = runtimeStatsSource; + public void setStatsSource(StatsSource statsSource) { + this.statsSource = statsSource; } public StatsSource getStatsSource() { - if (runtimeStatsSource != null) { - return runtimeStatsSource; + if (statsSource != null) { + return statsSource; } else { // hierarchical; add def stats also here - return new EmptyStatsSource(); + return EmptyStatsSource.INSTANCE; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a88453c978..364452c6f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2740,6 +2740,10 @@ public void setStatsSource(StatsSource runtimeStatsSource) { this.statsSource = runtimeStatsSource; } + public StatsSource getStatsSource() { + return statsSource; + } + @Override public boolean hasResultSet() { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index d3fbf07de4..9551d040e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; @@ -375,6 +376,8 @@ private void clearNotVectorizedReason() { private Set availableVectorizedVirtualColumnSet = null; private Set neededVirtualColumnSet = null; + private PlanMapper planMapper; + public class VectorizerCannotVectorizeException extends Exception { } @@ -867,7 +870,7 @@ private void queueDelayedFixup(Operator parent, } private void runDelayedFixups() { - for (Entry, Set, Operator>>> delayed + for (Entry, Set, Operator>>> delayed : delayedFixups.entrySet()) { Operator key = delayed.getKey(); Set, Operator>> value = @@ -1470,7 +1473,7 @@ private boolean verifyAndSetVectorPartDesc( enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); } } - + return false; } @@ -2183,6 +2186,7 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork, public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException { hiveConf = physicalContext.getConf(); + planMapper = physicalContext.getContext().getPlanMapper(); String vectorizationEnabledOverrideString = HiveConf.getVar(hiveConf, @@ -2708,7 +2712,7 @@ private boolean validatePTFOperator(PTFOperator op, VectorizationContext vContex } if (exprNodeDescList != null) { ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0); - + if (containsLeadLag(exprNodeDesc)) { setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName); return false; @@ -4951,6 +4955,8 @@ private static VectorPTFInfo createVectorPTFInfo(Operator class1) { return false; @@ -35,4 +41,9 @@ public boolean canProvideStatsFor(Class class1) { return Optional.empty(); } + @Override + public void putAll(Map map) { + throw new RuntimeException("This is an empty source!"); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SessionStatsSource.java ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SessionStatsSource.java new file mode 100644 index 0000000000..6866b5c36d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SessionStatsSource.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.stats.OperatorStats; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public class SessionStatsSource implements StatsSource { + + + private final Cache cache; + + // FIXME: consider not requesting hiveconf + public SessionStatsSource(HiveConf conf) { + int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE); + cache = CacheBuilder.newBuilder().maximumSize(size).build(); + } + + public void put(OpTreeSignature sig, OperatorStats opStat) { + cache.put(sig, opStat); + } + + @Override + public Optional lookup(OpTreeSignature treeSig) { + return Optional.ofNullable(cache.getIfPresent(treeSig)); + } + + @Override + public boolean canProvideStatsFor(Class clazz) { + if (Operator.class.isAssignableFrom(clazz)) { + return true; + } + return false; + } + + @Override + public void putAll(Map map) { + for (Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java index b5a3c2459f..3d6c257026 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan.mapper; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -56,4 +57,9 @@ public boolean canProvideStatsFor(Class class1) { return false; } + @Override + public void putAll(Map map) { + throw new RuntimeException(); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java index df5aa0c679..e8d51c91df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan.mapper; +import java.util.Map; import java.util.Optional; import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; @@ -25,8 +26,10 @@ public interface StatsSource { - boolean canProvideStatsFor(Class class1); + boolean canProvideStatsFor(Class clazz); Optional lookup(OpTreeSignature treeSig); + void putAll(Map map); + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java new file mode 100644 index 0000000000..f138698f5f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup; +import org.apache.hadoop.hive.ql.stats.OperatorStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class StatsSources { + + public static class MapBackedStatsSource implements StatsSource { + + private Map map = new HashMap<>(); + + @Override + public boolean canProvideStatsFor(Class clazz) { + if (Operator.class.isAssignableFrom(clazz)) { + return true; + } + return false; + } + + @Override + public Optional lookup(OpTreeSignature treeSig) { + return Optional.ofNullable(map.get(treeSig)); + } + + @Override + public void putAll(Map map) { + map.putAll(map); + } + + } + + private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class); + + public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) { + if (currentStatsSource instanceof SessionStatsSource) { + SessionStatsSource sessionStatsSource = (SessionStatsSource) currentStatsSource; + loadFromPlanMapper(sessionStatsSource, pm); + return sessionStatsSource; + } else { + return new SimpleRuntimeStatsSource(pm); + } + } + + public static void loadFromPlanMapper(SessionStatsSource sessionStatsSource, PlanMapper pm) { + Map map = extractStatMapFromPlanMapper(pm); + sessionStatsSource.putAll(map); + } + + + private static Map extractStatMapFromPlanMapper(PlanMapper pm) { + Map map = new HashMap(); + Iterator it = pm.iterateGroups(); + while (it.hasNext()) { + EquivGroup e = it.next(); + List stat = e.getAll(OperatorStats.class); + List sig = e.getAll(OpTreeSignature.class); + + if (stat.size() > 1 || sig.size() > 1) { + StringBuffer sb = new StringBuffer(); + sb.append(String.format("expected(stat-sig) 1-1, got {}-{} ;", stat.size(), sig.size())); + for (OperatorStats s : stat) { + sb.append(s); + sb.append(";"); + } + for (OpTreeSignature s : sig) { + sb.append(s); + sb.append(";"); + } + LOG.debug(sb.toString()); + } + if (stat.size() >= 1 && sig.size() >= 1) { + map.put(sig.get(0), stat.get(0)); + } + } + return map; + } + + private static StatsSource globalStatsSource; + + public static StatsSource globalStatsSource(HiveConf conf) { + if (globalStatsSource == null) { + globalStatsSource = new SessionStatsSource(conf); + } + return globalStatsSource; + } + + @VisibleForTesting + public void clearAllStats() { + globalStatsSource = null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java index 2b0d23c6f2..be62fc0075 100644 --- ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java +++ ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java @@ -59,6 +59,7 @@ */ boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper); + void afterExecute(PlanMapper planMapper, boolean successfull); } diff --git ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index 8a5595d210..501f0b40ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -156,6 +156,9 @@ public CommandProcessorResponse run() { LOG.info("Execution #{} of query", executionIndex); CommandProcessorResponse cpr = coreDriver.run(); + PlanMapper oldPlanMapper = coreDriver.getPlanMapper(); + afterExecute(oldPlanMapper, cpr.getResponseCode() == 0); + boolean shouldReExecute = explainReOptimization && executionIndex==1; shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute(); @@ -164,25 +167,34 @@ public CommandProcessorResponse run() { } LOG.info("Preparing to re-execute query"); prepareToReExecute(); - PlanMapper oldPlanMapper = coreDriver.getPlanMapper(); CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery); if (compile_resp.failed()) { + LOG.error("Recompilation of the query failed; this is unexpected."); // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before? return compile_resp; } PlanMapper newPlanMapper = coreDriver.getPlanMapper(); if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) { + LOG.info("re-running the query would probably not yield better results; returning with last error"); // FIXME: retain old error; or create a new one? return cpr; } } } + private void afterExecute(PlanMapper planMapper, boolean success) { + for (IReExecutionPlugin p : plugins) { + p.afterExecute(planMapper, success); + } + } + private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { boolean ret = false; for (IReExecutionPlugin p : plugins) { - ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper); + boolean shouldReExecute = p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper); + LOG.debug("{}.shouldReExecuteAfterCompile = {}", p, shouldReExecute); + ret |= shouldReExecute; } return ret; } @@ -190,7 +202,9 @@ private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper private boolean shouldReExecute() { boolean ret = false; for (IReExecutionPlugin p : plugins) { - ret |= p.shouldReExecute(executionIndex); + boolean shouldReExecute = p.shouldReExecute(executionIndex); + LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute); + ret |= shouldReExecute; } return ret; } diff --git ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java index 4ee3c14b39..950903c5c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java +++ ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java @@ -80,4 +80,8 @@ public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper pm2) public void beforeExecute(int executionIndex, boolean explainReOptimization) { } + @Override + public void afterExecute(PlanMapper planMapper, boolean success) { + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java index f731315956..409cc7312c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java +++ ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.Operator; @@ -29,7 +30,8 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSources; import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,8 @@ private OperatorStatsReaderHook statsReaderHook; + private boolean alwaysCollectStats; + class LocalHook implements ExecuteWithHookContext { @Override @@ -62,10 +66,10 @@ public void run(HookContext hookContext) throws Exception { if (message.contains("Vertex failed,") && isOOM) { retryPossible = true; } - System.out.println(exception); } } } + LOG.info("ReOptimization: retryPossible: {}", retryPossible); } } } @@ -77,9 +81,25 @@ public void initialize(Driver driver) { statsReaderHook = new OperatorStatsReaderHook(); coreDriver.getHookRunner().addOnFailureHook(statsReaderHook); coreDriver.getHookRunner().addPostHook(statsReaderHook); - // statsReaderHook.setCollectOnSuccess(true); - statsReaderHook.setCollectOnSuccess( - driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS)); + alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS); + statsReaderHook.setCollectOnSuccess(alwaysCollectStats); + + coreDriver.setStatsSource(getStatsSource(driver.getConf())); + } + + static enum StatsSourceMode { + query, hiveserver; + } + + private StatsSource getStatsSource(HiveConf conf) { + StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE)); + switch (mode) { + case query: + return new StatsSources.MapBackedStatsSource(); + case hiveserver: + return StatsSources.globalStatsSource(conf); + } + throw new RuntimeException("Unknown StatsSource setting: " + mode); } @Override @@ -90,17 +110,19 @@ public boolean shouldReExecute(int executionNum) { @Override public void prepareToReExecute() { statsReaderHook.setCollectOnSuccess(true); - PlanMapper pm = coreDriver.getContext().getPlanMapper(); - coreDriver.setStatsSource(new SimpleRuntimeStatsSource(pm)); retryPossible = false; + coreDriver.setStatsSource( + StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), coreDriver.getPlanMapper())); } @Override public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { - return planDidChange(oldPlanMapper, newPlanMapper); + boolean planDidChange = !planEquals(oldPlanMapper, newPlanMapper); + LOG.info("planDidChange: {}", planDidChange); + return planDidChange; } - private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) { + private boolean planEquals(PlanMapper pmL, PlanMapper pmR) { List opsL = getRootOps(pmL); List opsR = getRootOps(pmR); for (Iterator itL = opsL.iterator(); itL.hasNext();) { @@ -135,4 +157,12 @@ public void beforeExecute(int executionIndex, boolean explainReOptimization) { } } + @Override + public void afterExecute(PlanMapper planMapper, boolean success) { + if (alwaysCollectStats) { + coreDriver.setStatsSource( + StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), planMapper)); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6003ced27e..a815d36a0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook; @@ -119,6 +120,7 @@ private final Map> tempTables = new HashMap>(); private final Map> tempTableColStats = new HashMap>(); + private StatsSource sessionStatsSource; protected ClassLoader parentLoader; @@ -2005,6 +2007,15 @@ public void addCleanupItem(Closeable item) { return currentFunctionsInUse; } + public StatsSource getSessionStatsSource() { + return sessionStatsSource; + } + + + public void setSessionStatsSource(StatsSource sessionStatsSource) { + this.sessionStatsSource = sessionStatsSource; + } + } class ResourceMaps { diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java index 0afc533da8..b09aafb3a4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java @@ -77,12 +77,16 @@ public void testTree1() { Operator ts = getTsOp(i); Operator fil = getFilterOp(j); - ts.getChildOperators().add(fil); - fil.getParentOperators().add(ts); + connectOperators(ts, fil); return fil; } + private void connectOperators(Operator parent, Operator child) { + parent.getChildOperators().add(child); + child.getParentOperators().add(parent); + } + @Test public void testTableScand() { Operator t1 = getTsOp(3); @@ -157,4 +161,5 @@ public static void checkTreeNotEquals(Operator o1, Opera } + } diff --git ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java index 18aeb3338b..81269702de 100644 --- ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java +++ ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java @@ -172,7 +172,6 @@ public void testMappingJoinLookup() throws ParseException { } private static IDriver createDriver() { - // HiveConf conf = new HiveConf(Driver.class); HiveConf conf = env_setup.getTestCtx().hiveConf; conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED, true); conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true); diff --git ql/src/test/queries/clientpositive/runtime_stats_hs2.q ql/src/test/queries/clientpositive/runtime_stats_hs2.q new file mode 100644 index 0000000000..34a8dd3f2f --- /dev/null +++ ql/src/test/queries/clientpositive/runtime_stats_hs2.q @@ -0,0 +1,22 @@ + +create table tx(a int,u int); +insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10); + +create table px(a int,p int); +insert into px values (2,2),(3,3),(5,5),(7,7),(11,11); + +set hive.explain.user=true; +set hive.query.reexecution.enabled=true; +set hive.query.reexecution.always.collect.operator.stats=true; +set hive.query.reexecution.strategies=overlay,reoptimize; +set hive.query.reexecution.stats.persist.scope=hiveserver; + +-- join output estimate is underestimated: 1 row +explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; + +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; + +-- join output estimate is 3 rows ; all the operators stats are "runtime" +explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; diff --git ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out new file mode 100644 index 0000000000..4d60b8cfa5 --- /dev/null +++ ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out @@ -0,0 +1,141 @@ +PREHOOK: query: create table tx(a int,u int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tx +POSTHOOK: query: create table tx(a int,u int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tx +PREHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tx +POSTHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tx +POSTHOOK: Lineage: tx.a SCRIPT [] +POSTHOOK: Lineage: tx.u SCRIPT [] +PREHOOK: query: create table px(a int,p int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@px +POSTHOOK: query: create table px(a int,p int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@px +PREHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@px +POSTHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@px +POSTHOOK: Lineage: px.a SCRIPT [] +POSTHOOK: Lineage: px.p SCRIPT [] +PREHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 3 llap + File Output Operator [FS_15] + Group By Operator [GBY_13] (rows=1 width=8) + Output:["_col0"],aggregations:["sum(VALUE._col0)"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap + PARTITION_ONLY_SHUFFLE [RS_12] + Group By Operator [GBY_11] (rows=1 width=8) + Output:["_col0"],aggregations:["sum(_col0)"] + Select Operator [SEL_9] (rows=1 width=8) + Output:["_col0"] + Merge Join Operator [MERGEJOIN_20] (rows=1 width=8) + Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"] + <-Map 1 [SIMPLE_EDGE] llap + SHUFFLE [RS_6] + PartitionCols:_col0 + Select Operator [SEL_2] (rows=1 width=4) + Output:["_col0"] + Filter Operator [FIL_18] (rows=1 width=4) + predicate:((u < 10) and (u > 2)) + TableScan [TS_0] (rows=8 width=4) + default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"] + <-Map 4 [SIMPLE_EDGE] llap + SHUFFLE [RS_7] + PartitionCols:_col0 + Select Operator [SEL_5] (rows=1 width=4) + Output:["_col0"] + Filter Operator [FIL_19] (rows=1 width=4) + predicate:((p < 10) and (p > 2)) + TableScan [TS_3] (rows=5 width=4) + default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"] + +PREHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +PREHOOK: Input: default@px +PREHOOK: Input: default@tx +#### A masked pattern was here #### +POSTHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@px +POSTHOOK: Input: default@tx +#### A masked pattern was here #### +83 +PREHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 3 llap + File Output Operator [FS_15] + Group By Operator [GBY_13] (runtime: rows=1 width=8) + Output:["_col0"],aggregations:["sum(VALUE._col0)"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap + PARTITION_ONLY_SHUFFLE [RS_12] + Group By Operator [GBY_11] (runtime: rows=1 width=8) + Output:["_col0"],aggregations:["sum(_col0)"] + Select Operator [SEL_9] (runtime: rows=3 width=8) + Output:["_col0"] + Merge Join Operator [MERGEJOIN_20] (runtime: rows=3 width=8) + Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"] + <-Map 1 [SIMPLE_EDGE] llap + SHUFFLE [RS_6] + PartitionCols:_col0 + Select Operator [SEL_2] (runtime: rows=5 width=4) + Output:["_col0"] + Filter Operator [FIL_18] (runtime: rows=5 width=4) + predicate:((u < 10) and (u > 2)) + TableScan [TS_0] (runtime: rows=8 width=4) + default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"] + <-Map 4 [SIMPLE_EDGE] llap + SHUFFLE [RS_7] + PartitionCols:_col0 + Select Operator [SEL_5] (runtime: rows=3 width=4) + Output:["_col0"] + Filter Operator [FIL_19] (runtime: rows=3 width=4) + predicate:((p < 10) and (p > 2)) + TableScan [TS_3] (runtime: rows=5 width=4) + default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"] +