From 7d7c0f779fa9114ba927411eea7c1a89e5311e98 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Thu, 12 Mar 2020 22:47:36 +0000 Subject: [PATCH 1/6] ProbeDecodeOptimizer initial patch --- .../hive/ql/exec/TableScanOperator.java | 39 +++++ .../ql/optimizer/ProbeDecodeOptimizer.java | 135 ++++++++++++++++++ .../hadoop/hive/ql/parse/GenTezUtils.java | 4 + .../hadoop/hive/ql/parse/TezCompiler.java | 7 + .../apache/hadoop/hive/ql/plan/MapWork.java | 11 ++ 5 files changed, 196 insertions(+) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cd0637a971..20101e831d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -84,6 +84,37 @@ private String schemaEvolutionColumns; private String schemaEvolutionColumnsTypes; + private ProbeDecodeContext probeDecodeContext = null; + + /** + * Inner wrapper class for TS ProbeDecode optimization + */ + public static class ProbeDecodeContext { + + private final String mjSmallTableCacheKey; + private final String mjBigTableKeyColName; + private final byte mjSmallTablePos; + + public ProbeDecodeContext(String mjSmallTableCacheKey, byte mjSmallTablePos, String mjBigTableKeyColName) { + this.mjSmallTableCacheKey = mjSmallTableCacheKey; + this.mjSmallTablePos = mjSmallTablePos; + this.mjBigTableKeyColName = mjBigTableKeyColName; + } + + public String getMjSmallTableCacheKey() { + return mjSmallTableCacheKey; + } + + public byte getMjSmallTablePos() { + return mjSmallTablePos; + } + + public String getMjBigTableKeyColName() { + return mjBigTableKeyColName; + } + } + + public TableDesc getTableDescSkewJoin() { return tableDesc; } @@ -435,4 +466,12 @@ public VectorizationContext getOutputVectorizationContext() { return taskVectorizationContext; } + public ProbeDecodeContext getProbeDecodeContext() { + return probeDecodeContext; + } + + public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { + this.probeDecodeContext = probeDecodeContext; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java new file mode 100644 index 0000000000..9453f2967f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + *

The goal of this rule is to find suitable table scan operators that could reduce the + * number of rows decoded at runtime using extra available information. + * + *

First the rule checks all the available MapJoin operators that could propagate the + * smaller HashTable on the probing side (where TS is) to filter-out rows that would + * never match. To do so the HashTable information is pushed down to the TS properties. + * If the a single TS is used by multiple operators (shared-word), this rule + * can not be applied. + * + *

Second this rule can be extended to support static filter expressions like: + * select * from sales where sold_state = 'PR'; + * + *

The optimization only works with the Tez execution engine running on Llap. + * + */ +public class ProbeDecodeOptimizer extends Transform { + + private final static Logger LOG = LoggerFactory.getLogger(ProbeDecodeOptimizer.class); + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + // Get all TS operators + final List> topOps = new ArrayList<>(); + topOps.addAll(pctx.getTopOps().values()); + + if (topOps.isEmpty()) { + // Nothing to do here - is this even possible? + return pctx; + } + + Map tableJoinMap = new HashMap<>(); + for (Operator parentOp: topOps) { + // Make sure every parent Operator has a single branch and there is no shared-work + if (parentOp.getNumChild() > 1) { + continue; + } + + // Traverse graph and find MapJoins + Deque> deque = new LinkedList<>(); + deque.add(parentOp); + while (!deque.isEmpty()) { + Operator op = deque.pollLast(); + if (op instanceof MapJoinOperator) { + // MapJoin candidate + MapJoinOperator mop = (MapJoinOperator) op; + // Make sure this is a valid single (Number) key MapJoin and not Dynamic or BucketMapJoin + if (!mop.getConf().isBucketMapJoin() && !mop.getConf().isDynamicPartitionHashJoin() && isValidMapJoin(mop)) { + LOG.debug("ProbeDecode Mapping TS {} -> MJ {}", parentOp.getOperatorId(), mop.getOperatorId()); + tableJoinMap.put((TableScanOperator) parentOp, mop); + } + } + deque.addAll(op.getChildOperators()); + } + } + + // Propagate MapJoin information to the mapped TS operator (to be used by MapWork) + for (Map.Entry entry: tableJoinMap.entrySet()) { + + String mjCacheKey = entry.getValue().getConf().getCacheKey(); + + if (mjCacheKey == null) { + // Generate cache key if it has not been yet generated + mjCacheKey = MapJoinDesc.generateCacheKey(entry.getValue().getOperatorId()); + // Set in the conf of the map join operator + entry.getValue().getConf().setCacheKey(mjCacheKey); + } + // At this point we know is a single Key MapJoin so propagate key column info + byte posBigTable = (byte) entry.getValue().getConf().getPosBigTable(); + Byte[] order = entry.getValue().getConf().getTagOrder(); + Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); + + List keyDesc = entry.getValue().getConf().getKeys().get(posBigTable); + ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0); + + entry.getKey().setProbeDecodeContext(new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos, keyCol.getColumn())); + + LOG.debug("ProbeDecode MapJoin {} -> TS {} with CacheKey {} MapJoin Pos {} ColName {}", + entry.getValue().getName(), entry.getKey().getName(), mjCacheKey, mjSmallTablePos, keyCol.getColumn()); + } + return pctx; + } + + + // Is a MapJoin with a single Key of Number type (Long/Int/Short) + private boolean isValidMapJoin(MapJoinOperator mapJoinOp) { + Map> keyExprs = mapJoinOp.getConf().getKeys(); + List bigTableKeyExprs = keyExprs.get( (byte) mapJoinOp.getConf().getPosBigTable()); + return (bigTableKeyExprs.size() == 1) + && !(((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). + equals(PrimitiveCategory.STRING) || + ((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). + equals(PrimitiveCategory.BYTE)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 4441ea3215..87494d67f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -196,6 +196,10 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets()); } + if (ts.getProbeDecodeContext() != null) { + mapWork.setProbeDecodeContext(ts.getProbeDecodeContext()); + } + // add new item to the tez work tezWork.add(mapWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index caab0564f2..c57a04532b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; import org.apache.hadoop.hive.ql.optimizer.MergeJoinProc; import org.apache.hadoop.hive.ql.optimizer.NonBlockingOpDeDupProc; +import org.apache.hadoop.hive.ql.optimizer.ProbeDecodeOptimizer; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize; import org.apache.hadoop.hive.ql.optimizer.SetHashGroupByMinReduction; @@ -227,6 +228,12 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Shared scans optimization"); + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + if(procCtx.conf.getBoolVar(ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED)) { + new ProbeDecodeOptimizer().transform(procCtx.parseContext); + } + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "ProbeDecode optimization"); + // need a new run of the constant folding because we might have created lots // of "and true and true" conditions. // Rather than run the full constant folding just need to shortcut AND/OR expressions diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index ef7e956fc7..045e06b421 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator.ProbeDecodeContext; import org.apache.hadoop.hive.ql.exec.Utilities; import java.util.ArrayList; @@ -175,6 +176,8 @@ private boolean isMergeFromResolver; + private ProbeDecodeContext probeDecodeContext = null; + public MapWork() {} public MapWork(String name) { @@ -846,6 +849,14 @@ public void setVectorizationEnabledConditionsNotMet(Collection vectoriza return vectorizationEnabledConditionsNotMet; } + public ProbeDecodeContext getProbeDecodeContext() { + return probeDecodeContext; + } + + public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { + this.probeDecodeContext = probeDecodeContext; + } + public class MapExplainVectorization extends BaseExplainVectorization { private final MapWork mapWork; -- 2.20.1 (Apple Git-117) From 857f2a2c9837c4b0ff3c16c17ae6feb5970f9f95 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Thu, 12 Mar 2020 22:50:39 +0000 Subject: [PATCH 2/6] Add Missing HiveConf --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 73f185a1f3..a7b56482de 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1768,6 +1768,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."), HIVEMAPJOINFULLOUTER("hive.mapjoin.full.outer", true, "Whether to use MapJoin for FULL OUTER JOINs."), + HIVE_MAPJOIN_PROBEDECODE_ENABLED("hive.mapjoin.probedecode.enabled", false, + "Use cached MapJoin hashtable created on the small table side to filter out row columns that are not going\n "+ + "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data. "), HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE( "hive.test.mapjoin.full.outer.override", "none", new StringSet("none", "enable", "disable"), -- 2.20.1 (Apple Git-117) From c4a626a15ebbb8a9d3672c731c362f6842fa7cfb Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Fri, 13 Mar 2020 10:49:07 +0000 Subject: [PATCH 3/6] Dumping some TODOs --- .../apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java index 9453f2967f..da61afaf46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java @@ -94,6 +94,10 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } + // TODO: what if we have multiple MapJoins per TS? + + // TODO: some operators like VectorPTFEvaluatorStreamingDecimalMax do not allow selected -- take this into account here? + // Propagate MapJoin information to the mapped TS operator (to be used by MapWork) for (Map.Entry entry: tableJoinMap.entrySet()) { -- 2.20.1 (Apple Git-117) From 74403c560ff55a8e5ba9dcc42f458f590574b9ae Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Tue, 24 Mar 2020 19:17:25 +0000 Subject: [PATCH 4/6] Moving probeDecode MapJoin logic as part of removeSemijoinsParallelToMapJoin in semijoinRemovalBasedTransformations. TODO: make sure to add some logic (maybe configurable) deciding which TS filtering to select when we have multiple. --- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../hive/ql/exec/TableScanOperator.java | 12 +- .../ql/optimizer/ProbeDecodeOptimizer.java | 139 ------------------ .../hadoop/hive/ql/parse/GenTezUtils.java | 7 +- .../hadoop/hive/ql/parse/TezCompiler.java | 92 +++++++++--- 5 files changed, 84 insertions(+), 171 deletions(-) delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a7b56482de..a0c5be772c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1769,8 +1769,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEMAPJOINFULLOUTER("hive.mapjoin.full.outer", true, "Whether to use MapJoin for FULL OUTER JOINs."), HIVE_MAPJOIN_PROBEDECODE_ENABLED("hive.mapjoin.probedecode.enabled", false, - "Use cached MapJoin hashtable created on the small table side to filter out row columns that are not going\n "+ - "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data. "), + "Find suitable table scan operators that could reduce the number of rows decoded at runtime using extra available information. \n" + + "e.g., use the cached MapJoin hashtable created on the small table side to filter out row columns that are not going "+ + "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data."), HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE( "hive.test.mapjoin.full.outer.override", "none", new StringSet("none", "enable", "disable"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 20101e831d..f95e3491bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -84,7 +86,7 @@ private String schemaEvolutionColumns; private String schemaEvolutionColumnsTypes; - private ProbeDecodeContext probeDecodeContext = null; + private Set probeDecodeContextSet = new HashSet<>(); /** * Inner wrapper class for TS ProbeDecode optimization @@ -466,12 +468,12 @@ public VectorizationContext getOutputVectorizationContext() { return taskVectorizationContext; } - public ProbeDecodeContext getProbeDecodeContext() { - return probeDecodeContext; + public Set getProbeDecodeContextSet() { + return probeDecodeContextSet; } - public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { - this.probeDecodeContext = probeDecodeContext; + public void addProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { + this.probeDecodeContextSet.add(probeDecodeContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java deleted file mode 100644 index da61afaf46..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ProbeDecodeOptimizer.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer; - -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - *

The goal of this rule is to find suitable table scan operators that could reduce the - * number of rows decoded at runtime using extra available information. - * - *

First the rule checks all the available MapJoin operators that could propagate the - * smaller HashTable on the probing side (where TS is) to filter-out rows that would - * never match. To do so the HashTable information is pushed down to the TS properties. - * If the a single TS is used by multiple operators (shared-word), this rule - * can not be applied. - * - *

Second this rule can be extended to support static filter expressions like: - * select * from sales where sold_state = 'PR'; - * - *

The optimization only works with the Tez execution engine running on Llap. - * - */ -public class ProbeDecodeOptimizer extends Transform { - - private final static Logger LOG = LoggerFactory.getLogger(ProbeDecodeOptimizer.class); - - @Override - public ParseContext transform(ParseContext pctx) throws SemanticException { - - // Get all TS operators - final List> topOps = new ArrayList<>(); - topOps.addAll(pctx.getTopOps().values()); - - if (topOps.isEmpty()) { - // Nothing to do here - is this even possible? - return pctx; - } - - Map tableJoinMap = new HashMap<>(); - for (Operator parentOp: topOps) { - // Make sure every parent Operator has a single branch and there is no shared-work - if (parentOp.getNumChild() > 1) { - continue; - } - - // Traverse graph and find MapJoins - Deque> deque = new LinkedList<>(); - deque.add(parentOp); - while (!deque.isEmpty()) { - Operator op = deque.pollLast(); - if (op instanceof MapJoinOperator) { - // MapJoin candidate - MapJoinOperator mop = (MapJoinOperator) op; - // Make sure this is a valid single (Number) key MapJoin and not Dynamic or BucketMapJoin - if (!mop.getConf().isBucketMapJoin() && !mop.getConf().isDynamicPartitionHashJoin() && isValidMapJoin(mop)) { - LOG.debug("ProbeDecode Mapping TS {} -> MJ {}", parentOp.getOperatorId(), mop.getOperatorId()); - tableJoinMap.put((TableScanOperator) parentOp, mop); - } - } - deque.addAll(op.getChildOperators()); - } - } - - // TODO: what if we have multiple MapJoins per TS? - - // TODO: some operators like VectorPTFEvaluatorStreamingDecimalMax do not allow selected -- take this into account here? - - // Propagate MapJoin information to the mapped TS operator (to be used by MapWork) - for (Map.Entry entry: tableJoinMap.entrySet()) { - - String mjCacheKey = entry.getValue().getConf().getCacheKey(); - - if (mjCacheKey == null) { - // Generate cache key if it has not been yet generated - mjCacheKey = MapJoinDesc.generateCacheKey(entry.getValue().getOperatorId()); - // Set in the conf of the map join operator - entry.getValue().getConf().setCacheKey(mjCacheKey); - } - // At this point we know is a single Key MapJoin so propagate key column info - byte posBigTable = (byte) entry.getValue().getConf().getPosBigTable(); - Byte[] order = entry.getValue().getConf().getTagOrder(); - Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); - - List keyDesc = entry.getValue().getConf().getKeys().get(posBigTable); - ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0); - - entry.getKey().setProbeDecodeContext(new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos, keyCol.getColumn())); - - LOG.debug("ProbeDecode MapJoin {} -> TS {} with CacheKey {} MapJoin Pos {} ColName {}", - entry.getValue().getName(), entry.getKey().getName(), mjCacheKey, mjSmallTablePos, keyCol.getColumn()); - } - return pctx; - } - - - // Is a MapJoin with a single Key of Number type (Long/Int/Short) - private boolean isValidMapJoin(MapJoinOperator mapJoinOp) { - Map> keyExprs = mapJoinOp.getConf().getKeys(); - List bigTableKeyExprs = keyExprs.get( (byte) mapJoinOp.getConf().getPosBigTable()); - return (bigTableKeyExprs.size() == 1) - && !(((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). - equals(PrimitiveCategory.STRING) || - ((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). - equals(PrimitiveCategory.BYTE)); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 87494d67f5..53ae08de7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -196,8 +196,11 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets()); } - if (ts.getProbeDecodeContext() != null) { - mapWork.setProbeDecodeContext(ts.getProbeDecodeContext()); + if (!ts.getProbeDecodeContextSet().isEmpty()) { + // TODO: Add some logic here? + // TODO: what if we have multiple MapJoins per TS? + // TODO: some operators like VectorPTFEvaluatorStreamingDecimalMax do not allow selected -- take this into account here? + mapWork.setProbeDecodeContext(ts.getProbeDecodeContextSet().iterator().next()); } // add new item to the tez work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index c57a04532b..ca481f83dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -87,7 +87,6 @@ import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; import org.apache.hadoop.hive.ql.optimizer.MergeJoinProc; import org.apache.hadoop.hive.ql.optimizer.NonBlockingOpDeDupProc; -import org.apache.hadoop.hive.ql.optimizer.ProbeDecodeOptimizer; import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize; import org.apache.hadoop.hive.ql.optimizer.SetHashGroupByMinReduction; @@ -126,6 +125,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -139,6 +139,8 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,12 +230,6 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Shared scans optimization"); - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if(procCtx.conf.getBoolVar(ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED)) { - new ProbeDecodeOptimizer().transform(procCtx.parseContext); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "ProbeDecode optimization"); - // need a new run of the constant folding because we might have created lots // of "and true and true" conditions. // Rather than run the full constant folding just need to shortcut AND/OR expressions @@ -1317,7 +1313,8 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperator bigTableTS, ParseContext parseContext, - Map semijoins) { + Map semijoins, + Map mapJoins) { boolean parallelEdges = false; for (Operator op : mapjoin.getParentOperators()) { @@ -1384,12 +1381,10 @@ private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperato parallelEdges = true; - if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) { - // Created by hint, skip it - continue; - } // Add the semijoin branch to the map semijoins.put(rs, ts); + // Add mapJoin branch to probeDecode map + mapJoins.put((MapJoinOperator) mapjoin, ts); } } } @@ -1450,11 +1445,14 @@ private void removeSemiJoinEdgesForUnion(OptimizeTezProcContext procCtx) throws * The algorithm looks at all the mapjoins in the operator pipeline until * it hits RS Op and for each mapjoin examines if it has paralllel semijoin * edge or dynamic partition pruning. + * + * As an extension, the algorithm also looks for suitable table scan operators that + * could reduce the number of rows decoded at runtime using the information provided by + * the MapJoin operators of the branch when ProbeDecode feature is enabled. */ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) throws SemanticException { - if(!procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN) || - procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) { + if (!procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN)) { // Not needed without semi-join reduction or mapjoins or when semijoins // are enabled for parallel mapjoins. return; @@ -1465,6 +1463,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) topOps.addAll(procCtx.parseContext.getTopOps().values()); Map semijoins = new HashMap<>(); + Map mapJoins = new HashMap<>(); for (Operator parent : topOps) { // A TS can have multiple branches due to DPP Or Semijoin Opt. // USe DFS to traverse all the branches until RS is hit. @@ -1480,7 +1479,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) if (op instanceof MapJoinOperator) { // A candidate. if (!findParallelSemiJoinBranch(op, (TableScanOperator) parent, - procCtx.parseContext, semijoins)) { + procCtx.parseContext, semijoins, mapJoins)) { // No parallel edge was found for the given mapjoin op, // no need to go down further, skip this TS operator pipeline. break; @@ -1490,19 +1489,66 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) } } - if (semijoins.size() > 0) { - for (ReduceSinkOperator rs : semijoins.keySet()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Semijoin optimization with parallel edge to map join. Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(semijoins.get(rs))); + if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) { + if (semijoins.size() > 0) { + for (Entry semiEntry : semijoins.entrySet()) { + SemiJoinBranchInfo sjInfo = procCtx.parseContext.getRsToSemiJoinBranchInfo().get(semiEntry.getKey()); + if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) { + // Created by hint, skip it + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Semijoin optimization with parallel edge to map join. Removing semijoin " + + OperatorUtils.getOpNamePretty(semiEntry.getKey()) + " - " + OperatorUtils.getOpNamePretty(semiEntry.getValue())); + } + GenTezUtils.removeBranch(semiEntry.getKey()); + GenTezUtils.removeSemiJoinOperator(procCtx.parseContext, semiEntry.getKey(), semiEntry.getValue()); + } + } + } + if (procCtx.conf.getBoolVar(ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED)) { + if (mapJoins.size() > 0) { + for (Entry smj : mapJoins.entrySet()) { + if (!isValidProbeDecodeMapJoin(smj.getKey())) { + continue; + } + String mjCacheKey = smj.getKey().getConf().getCacheKey(); + if (mjCacheKey == null) { + // Generate cache key if it has not been yet generated + mjCacheKey = MapJoinDesc.generateCacheKey(smj.getValue().getOperatorId()); + // Set in the conf of the map join operator + smj.getKey().getConf().setCacheKey(mjCacheKey); + } + // At this point we know it is a single Key MapJoin + byte posBigTable = (byte) smj.getKey().getConf().getPosBigTable(); + Byte[] order = smj.getKey().getConf().getTagOrder(); + Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); + + List keyDesc = smj.getKey().getConf().getKeys().get(posBigTable); + ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0); + if (LOG.isDebugEnabled()) { + LOG.debug("ProbeDecode MapJoin Op {} for TS {} with CacheKey {} MapJoin Pos {} ColName {}", + smj.getValue().getName(), smj.getKey().getName(), mjCacheKey, mjSmallTablePos, keyCol.getColumn()); + } + TableScanOperator.ProbeDecodeContext currCtx = + new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos, keyCol.getColumn()); + smj.getValue().addProbeDecodeContext(currCtx); } - GenTezUtils.removeBranch(rs); - GenTezUtils.removeSemiJoinOperator(procCtx.parseContext, rs, - semijoins.get(rs)); } } } + // Valid MapJoin with a single Key of Number type (Long/Int/Short) + private static boolean isValidProbeDecodeMapJoin(MapJoinOperator mapJoinOp) { + Map> keyExprs = mapJoinOp.getConf().getKeys(); + List bigTableKeyExprs = keyExprs.get( (byte) mapJoinOp.getConf().getPosBigTable()); + return (bigTableKeyExprs.size() == 1) + && !(((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). + equals(PrimitiveObjectInspector.PrimitiveCategory.STRING) || + ((PrimitiveTypeInfo) bigTableKeyExprs.get(0).getTypeInfo()).getPrimitiveCategory(). + equals(PrimitiveObjectInspector.PrimitiveCategory.BYTE)); + } + private static boolean canUseNDV(ColStatistics colStats) { return (colStats != null) && (colStats.getCountDistint() >= 0); } -- 2.20.1 (Apple Git-117) From bbfa84fd2067c5da5bff54ba3fe9bcdd3d867551 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Mon, 6 Apr 2020 19:53:44 +0100 Subject: [PATCH 5/6] More generic probeDecode name for HiveConf. Extend TezCompiler to use configurable logic to select a MJ when we have multiple per TS -- currently picking the one with the lowest MJ/TS key Ratio. Making sure that SharedWork TS does not contain a probeContext as it would result in wrong results. Extending TSDesc to include probeContext as part of the explained plan. Change-Id: I90f5ebd8e4f90990c17f89ef75af39bc335f8e7c --- .../org/apache/hadoop/hive/conf/HiveConf.java | 10 +- .../hive/ql/exec/TableScanOperator.java | 23 ++- .../ql/optimizer/SharedWorkOptimizer.java | 9 ++ .../hadoop/hive/ql/parse/GenTezUtils.java | 8 +- .../hadoop/hive/ql/parse/TezCompiler.java | 137 +++++++++++++----- .../hadoop/hive/ql/plan/TableScanDesc.java | 25 +++- 6 files changed, 162 insertions(+), 50 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a0c5be772c..25884e6f26 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1768,10 +1768,6 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."), HIVEMAPJOINFULLOUTER("hive.mapjoin.full.outer", true, "Whether to use MapJoin for FULL OUTER JOINs."), - HIVE_MAPJOIN_PROBEDECODE_ENABLED("hive.mapjoin.probedecode.enabled", false, - "Find suitable table scan operators that could reduce the number of rows decoded at runtime using extra available information. \n" - + "e.g., use the cached MapJoin hashtable created on the small table side to filter out row columns that are not going "+ - "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data."), HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE( "hive.test.mapjoin.full.outer.override", "none", new StringSet("none", "enable", "disable"), @@ -2461,6 +2457,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The comma-separated list of SerDe classes that are considered when enhancing table-properties \n" + "during logical optimization."), + HIVE_OPTIMIZE_SCAN_PROBEDECODE("hive.optimize.scan.probedecode", false, + "Whether to find suitable table scan operators that could reduce the number of decoded rows at runtime by probing extra available information. \n" + + "The probe side for the row-level filtering is generated either statically in the case of expressions or dynamically for joins" + + "e.g., use the cached MapJoin hashtable created on the small table side to filter out row columns that are not going " + + "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data."), + // CTE HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1, "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index f95e3491bd..ea66fc7dc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -86,7 +86,7 @@ private String schemaEvolutionColumns; private String schemaEvolutionColumnsTypes; - private Set probeDecodeContextSet = new HashSet<>(); + private ProbeDecodeContext probeDecodeContextSet; /** * Inner wrapper class for TS ProbeDecode optimization @@ -96,11 +96,14 @@ private final String mjSmallTableCacheKey; private final String mjBigTableKeyColName; private final byte mjSmallTablePos; + private final double keyRatio; - public ProbeDecodeContext(String mjSmallTableCacheKey, byte mjSmallTablePos, String mjBigTableKeyColName) { + public ProbeDecodeContext(String mjSmallTableCacheKey, byte mjSmallTablePos, String mjBigTableKeyColName, + double keyRatio) { this.mjSmallTableCacheKey = mjSmallTableCacheKey; this.mjSmallTablePos = mjSmallTablePos; this.mjBigTableKeyColName = mjBigTableKeyColName; + this.keyRatio = keyRatio; } public String getMjSmallTableCacheKey() { @@ -114,6 +117,16 @@ public byte getMjSmallTablePos() { public String getMjBigTableKeyColName() { return mjBigTableKeyColName; } + + public double getKeyRatio() { + return keyRatio; + } + + @Override + public String toString() { + return "cacheKey:" + mjSmallTableCacheKey + ", bigKeyColName:" + mjBigTableKeyColName + + ", smallTablePos:" + mjSmallTablePos + ", keyRatio:" + keyRatio; + } } @@ -468,12 +481,12 @@ public VectorizationContext getOutputVectorizationContext() { return taskVectorizationContext; } - public Set getProbeDecodeContextSet() { + public ProbeDecodeContext getProbeDecodeContext() { return probeDecodeContextSet; } - public void addProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { - this.probeDecodeContextSet.add(probeDecodeContext); + public void setProbeDecodeContext(ProbeDecodeContext probeDecodeContext) { + this.probeDecodeContextSet = probeDecodeContext; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 2f49985800..caed527d71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -487,6 +487,15 @@ private static boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptim } LOG.debug("Input operator removed: {}", op); } + + // A shared TSop across branches can not have probeContext that utilizes single branch info + // Filtered-out rows from one branch might be needed by another branch sharing a TSop + if (retainableTsOp.getProbeDecodeContext() != null) { + LOG.debug("Removing probeDecodeCntx for merged TS op {}", retainableTsOp); + retainableTsOp.setProbeDecodeContext(null); + retainableTsOp.getConf().setProbeDecodeContext(null); + } + // Then we merge the operators of the works we are going to merge optimizerCache.removeOpAndCombineWork(discardableTsOp, retainableTsOp); removedOps.add(discardableTsOp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 53ae08de7b..027f73f9d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -196,11 +196,9 @@ public MapWork createMapWork(GenTezProcContext context, Operator root, mapWork.setIncludedBuckets(ts.getConf().getIncludedBuckets()); } - if (!ts.getProbeDecodeContextSet().isEmpty()) { - // TODO: Add some logic here? - // TODO: what if we have multiple MapJoins per TS? - // TODO: some operators like VectorPTFEvaluatorStreamingDecimalMax do not allow selected -- take this into account here? - mapWork.setProbeDecodeContext(ts.getProbeDecodeContextSet().iterator().next()); + if (ts.getProbeDecodeContext() != null) { + // TODO: some operators like VectorPTFEvaluator do not allow the use of Selected take this into account here? + mapWork.setProbeDecodeContext(ts.getProbeDecodeContext()); } // add new item to the tez work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ca481f83dc..d07ff21b7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1314,7 +1314,7 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperator bigTableTS, ParseContext parseContext, Map semijoins, - Map mapJoins) { + Map> probeDecodeMJoins) { boolean parallelEdges = false; for (Operator op : mapjoin.getParentOperators()) { @@ -1383,8 +1383,11 @@ private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperato // Add the semijoin branch to the map semijoins.put(rs, ts); - // Add mapJoin branch to probeDecode map - mapJoins.put((MapJoinOperator) mapjoin, ts); + // Add mapJoin branch to probeDecode table + if (!probeDecodeMJoins.containsKey(ts)){ + probeDecodeMJoins.put(ts, new ArrayList<>()); + } + probeDecodeMJoins.get(ts).add((MapJoinOperator) mapjoin); } } } @@ -1453,8 +1456,7 @@ private void removeSemiJoinEdgesForUnion(OptimizeTezProcContext procCtx) throws private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) throws SemanticException { if (!procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN)) { - // Not needed without semi-join reduction or mapjoins or when semijoins - // are enabled for parallel mapjoins. + // Not needed without mapjoin conversion return; } @@ -1463,7 +1465,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) topOps.addAll(procCtx.parseContext.getTopOps().values()); Map semijoins = new HashMap<>(); - Map mapJoins = new HashMap<>(); + Map> probeDecodeMJoins = new HashMap<>(); for (Operator parent : topOps) { // A TS can have multiple branches due to DPP Or Semijoin Opt. // USe DFS to traverse all the branches until RS is hit. @@ -1479,7 +1481,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) if (op instanceof MapJoinOperator) { // A candidate. if (!findParallelSemiJoinBranch(op, (TableScanOperator) parent, - procCtx.parseContext, semijoins, mapJoins)) { + procCtx.parseContext, semijoins, probeDecodeMJoins)) { // No parallel edge was found for the given mapjoin op, // no need to go down further, skip this TS operator pipeline. break; @@ -1488,7 +1490,7 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) deque.addAll(op.getChildOperators()); } } - + // No need to remove SJ branches when we have semi-join reduction or when semijoins are enabled for parallel mapjoins. if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) { if (semijoins.size() > 0) { for (Entry semiEntry : semijoins.entrySet()) { @@ -1506,36 +1508,102 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) } } } - if (procCtx.conf.getBoolVar(ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED)) { - if (mapJoins.size() > 0) { - for (Entry smj : mapJoins.entrySet()) { - if (!isValidProbeDecodeMapJoin(smj.getKey())) { - continue; - } - String mjCacheKey = smj.getKey().getConf().getCacheKey(); - if (mjCacheKey == null) { - // Generate cache key if it has not been yet generated - mjCacheKey = MapJoinDesc.generateCacheKey(smj.getValue().getOperatorId()); - // Set in the conf of the map join operator - smj.getKey().getConf().setCacheKey(mjCacheKey); - } - // At this point we know it is a single Key MapJoin - byte posBigTable = (byte) smj.getKey().getConf().getPosBigTable(); - Byte[] order = smj.getKey().getConf().getTagOrder(); - Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); + if (procCtx.conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE)) { + if (probeDecodeMJoins.size() > 0) { + // When multiple MJ, select one based on a policy + for (Map.Entry> probeTsMap : probeDecodeMJoins.entrySet()){ + TableScanOperator.ProbeDecodeContext tsCntx = null; + // Currently supporting: LowestRatio policy + // TODO: Add more policies and make the selection a conf property + tsCntx = selectLowestRatioProbeDecodeMapJoin(probeTsMap.getKey(), probeTsMap.getValue()); + LOG.debug("ProbeDecode MJ for TS {} with CacheKey {} MJ Pos {} ColName {} with Ratio {}", + probeTsMap.getKey().getName(), tsCntx.getMjSmallTableCacheKey(), tsCntx.getMjSmallTablePos(), + tsCntx.getMjBigTableKeyColName(), tsCntx.getKeyRatio()); + probeTsMap.getKey().setProbeDecodeContext(tsCntx); + probeTsMap.getKey().getConf().setProbeDecodeContext(tsCntx); + } + } + } + } - List keyDesc = smj.getKey().getConf().getKeys().get(posBigTable); - ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0); - if (LOG.isDebugEnabled()) { - LOG.debug("ProbeDecode MapJoin Op {} for TS {} with CacheKey {} MapJoin Pos {} ColName {}", - smj.getValue().getName(), smj.getKey().getName(), mjCacheKey, mjSmallTablePos, keyCol.getColumn()); - } - TableScanOperator.ProbeDecodeContext currCtx = - new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos, keyCol.getColumn()); - smj.getValue().addProbeDecodeContext(currCtx); + private static TableScanOperator.ProbeDecodeContext selectLowestRatioProbeDecodeMapJoin(TableScanOperator tsOp, + List mjOps){ + MapJoinOperator selectedMJOp = null; + double selectedMJOpRatio = 0; + for (MapJoinOperator currMJOp : mjOps) { + if (!isValidProbeDecodeMapJoin(currMJOp)) { + continue; + } + // At this point we know it is a single Key MapJoin + if (selectedMJOp == null) { + // Set the first valid MJ + selectedMJOp = currMJOp; + selectedMJOpRatio = getProbeDecodeNDVRatio(tsOp, currMJOp); + LOG.debug("ProbeDecode MJ {} with Ratio {}", selectedMJOp, selectedMJOpRatio); + } else { + double currMJRatio = getProbeDecodeNDVRatio(tsOp, currMJOp); + if (currMJRatio < selectedMJOpRatio){ + LOG.debug("ProbeDecode MJ {} Ratio {} is lower than existing MJ {} with Ratio {}", + currMJOp, currMJRatio, selectedMJOp, selectedMJOpRatio); + selectedMJOp = currMJOp; + selectedMJOpRatio = currMJRatio; } } } + + TableScanOperator.ProbeDecodeContext tsProbeDecodeCtx = null; + // If there a valid MJ to be used for TS probeDecode make sure the MJ cache key is generated and + // then propagate the new ProbeDecodeContext (to be used by LLap IO when executing the TSop) + if (selectedMJOp != null) { + String mjCacheKey = selectedMJOp.getConf().getCacheKey(); + if (mjCacheKey == null) { + // Generate cache key if it has not been yet generated + mjCacheKey = MapJoinDesc.generateCacheKey(selectedMJOp.getOperatorId()); + // Set in the conf of the map join operator + selectedMJOp.getConf().setCacheKey(mjCacheKey); + } + + byte posBigTable = (byte) selectedMJOp.getConf().getPosBigTable(); + Byte[] order = selectedMJOp.getConf().getTagOrder(); + Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); + + List keyDesc = selectedMJOp.getConf().getKeys().get(posBigTable); + ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyDesc.get(0); + + tsProbeDecodeCtx = new TableScanOperator.ProbeDecodeContext(mjCacheKey, mjSmallTablePos, + keyCol.getColumn(), selectedMJOpRatio); + } + return tsProbeDecodeCtx; + } + + // Return the ratio of: (distinct) JOIN_probe_key_column_rows / (distinct) JOIN_TS_target_column_rows + private static double getProbeDecodeNDVRatio(TableScanOperator tsOp, MapJoinOperator mjOp) { + long mjKeyCardinality = mjOp.getStatistics().getNumRows(); + long tsKeyCardinality = tsOp.getStatistics().getNumRows(); + + byte posBigTable = (byte) mjOp.getConf().getPosBigTable(); + + Byte[] order = mjOp.getConf().getTagOrder(); + Byte mjSmallTablePos = (order[0] == posBigTable ? order[1] : order[0]); + Byte mjBigTablePos = (order[0] == posBigTable ? order[0] : order[1]); + + // Single Key MJ at this point + List tsKeyDesc = mjOp.getConf().getKeys().get(mjBigTablePos); + ExprNodeColumnDesc tsKeyCol = (ExprNodeColumnDesc) tsKeyDesc.get(0); + + List mjKeyDesc = mjOp.getConf().getKeys().get(mjSmallTablePos); + ExprNodeColumnDesc mjKeyCol = (ExprNodeColumnDesc) mjKeyDesc.get(0); + + ColStatistics mjStats = mjOp.getStatistics().getColumnStatisticsFromColName(mjKeyCol.getColumn()); + ColStatistics tsStats = tsOp.getStatistics().getColumnStatisticsFromColName(tsKeyCol.getColumn()); + + if (canUseNDV(mjStats)) { + mjKeyCardinality = mjStats.getCountDistint(); + } + if (canUseNDV(tsStats)) { + tsKeyCardinality = tsStats.getCountDistint(); + } + return mjKeyCardinality / (double) tsKeyCardinality; } // Valid MapJoin with a single Key of Number type (Long/Int/Short) @@ -1770,6 +1838,7 @@ private static void sortSemijoinFilters(OptimizeTezProcContext procCtx, private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) throws SemanticException { + Map map = procCtx.parseContext.getRsToSemiJoinBranchInfo(); if (map.isEmpty()) { // Nothing to do diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index d2e22c8388..53a6036298 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; @@ -115,6 +116,8 @@ private AcidUtils.AcidOperationalProperties acidOperationalProperties = null; + private TableScanOperator.ProbeDecodeContext probeDecodeContext = null; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -133,10 +136,15 @@ public TableScanDesc(Table tblMetadata) { } public TableScanDesc(final String alias, Table tblMetadata) { - this(alias, null, tblMetadata); + this(alias, null, tblMetadata, null); } public TableScanDesc(final String alias, List vcs, Table tblMetadata) { + this(alias, vcs, tblMetadata, null); + } + + public TableScanDesc(final String alias, List vcs, Table tblMetadata, + TableScanOperator.ProbeDecodeContext probeDecodeContext) { this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; @@ -149,12 +157,13 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad if (isTranscationalTable) { acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); } + this.probeDecodeContext = probeDecodeContext; } @Override public Object clone() { List vcs = new ArrayList(getVirtualCols()); - return new TableScanDesc(getAlias(), vcs, this.tableMetadata); + return new TableScanDesc(getAlias(), vcs, this.tableMetadata, this.probeDecodeContext); } @Explain(displayName = "alias") @@ -238,6 +247,18 @@ public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) { this.filterExpr = filterExpr; } + @Explain(displayName = "probeDecodeDetails", explainLevels = { Level.DEFAULT, Level.EXTENDED }) + public String getProbeDecodeString() { + if (probeDecodeContext == null) { + return null; + } + return probeDecodeContext.toString(); + } + + public void setProbeDecodeContext(TableScanOperator.ProbeDecodeContext probeDecodeContext) { + this.probeDecodeContext = probeDecodeContext; + } + public Serializable getFilterObject() { return filterObject; } -- 2.20.1 (Apple Git-117) From a7ba8736ec98cab693ca0a3d07508aa2b63bc9df Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Mon, 6 Apr 2020 19:54:56 +0100 Subject: [PATCH 6/6] Adding probeDecode test with and without stats enabled Change-Id: I995ba97355ab1a29bf2e23f21ab641cffba19ce3 --- .../resources/testconfiguration.properties | 2 + .../probedecode_mapjoin_simple.q | 31 ++ .../probedecode_mapjoin_stats.q | 55 +++ .../llap/probedecode_mapjoin_simple.q.out | 282 ++++++++++++ .../llap/probedecode_mapjoin_stats.q.out | 419 ++++++++++++++++++ 5 files changed, 789 insertions(+) create mode 100644 ql/src/test/queries/clientpositive/probedecode_mapjoin_simple.q create mode 100644 ql/src/test/queries/clientpositive/probedecode_mapjoin_stats.q create mode 100644 ql/src/test/results/clientpositive/llap/probedecode_mapjoin_simple.q.out create mode 100644 ql/src/test/results/clientpositive/llap/probedecode_mapjoin_stats.q.out diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e99ce7babb..35a709be22 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -540,6 +540,8 @@ minillaplocal.query.files=\ cross_prod_3.q,\ cross_prod_4.q,\ dpp.q,\ + probedecode_mapjoin_stats.q,\ + probedecode_mapjoin_simple.q,\ dynamic_partition_pruning.q,\ dynamic_partition_join_noncbo.q,\ dynamic_semijoin_reduction.q,\ diff --git a/ql/src/test/queries/clientpositive/probedecode_mapjoin_simple.q b/ql/src/test/queries/clientpositive/probedecode_mapjoin_simple.q new file mode 100644 index 0000000000..90550e23e0 --- /dev/null +++ b/ql/src/test/queries/clientpositive/probedecode_mapjoin_simple.q @@ -0,0 +1,31 @@ +set hive.stats.column.autogather=false; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +SET hive.auto.convert.join=true; +SET hive.auto.convert.join.noconditionaltask=true; +SET hive.auto.convert.join.noconditionaltask.size=1000000000; +SET hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true; +set hive.fetch.task.conversion=none; +SET mapred.min.split.size=1000; +SET mapred.max.split.size=5000; + +CREATE TABLE item_dim (key1 int, name string) stored as ORC; +CREATE TABLE orders_fact (nokey int, key2 int, dt timestamp) stored as ORC; + +INSERT INTO item_dim values(101, "Item 101"); +INSERT INTO item_dim values(102, "Item 102"); + +INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00'); +INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00'); +INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00'); +INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00'); +INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00'); +INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00'); + +SET hive.optimize.scan.probedecode=true; + +EXPLAIN VECTORIZATION DETAIL select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1); + +-- two keys match, the remaining rows can be skipped +select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1); diff --git a/ql/src/test/queries/clientpositive/probedecode_mapjoin_stats.q b/ql/src/test/queries/clientpositive/probedecode_mapjoin_stats.q new file mode 100644 index 0000000000..0dde302695 --- /dev/null +++ b/ql/src/test/queries/clientpositive/probedecode_mapjoin_stats.q @@ -0,0 +1,55 @@ +SET hive.auto.convert.join=true; +SET hive.auto.convert.join.noconditionaltask=true; +SET hive.auto.convert.join.noconditionaltask.size=1000000000; + +SET hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true; + +set hive.stats.dbclass=fs; +set hive.stats.fetch.column.stats=true; +set datanucleus.cache.collections=false; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +set hive.stats.autogather=true; +set hive.stats.column.autogather=true; +set hive.compute.query.using.stats=true; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; + +set hive.fetch.task.conversion=none; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.query.results.cache.enabled=false; + +SET mapred.min.split.size=1000; +SET mapred.max.split.size=5000; + +CREATE TABLE item_dim (key1 int, name string) stored as ORC; +CREATE TABLE orders_fact (key3 int, key2 int, dt timestamp) stored as ORC; +CREATE TABLE seller_dim (key4 int, sellername string) stored as ORC; + +INSERT INTO item_dim values(101, "Item 101"); +INSERT INTO item_dim values(102, "Item 102"); +INSERT INTO item_dim values(103, "Item 103"); +INSERT INTO item_dim values(104, "Item 104"); +INSERT INTO item_dim values(105, "Item 105"); + +INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00'); +INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00'); +INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00'); +INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00'); +INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00'); +INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00'); + +-- Less cardinality than item_dim +INSERT INTO seller_dim values(12345, "Seller 1"); +INSERT INTO seller_dim values(23456, "Seller 2"); + +SET hive.optimize.scan.probedecode=true; + +EXPLAIN VECTORIZATION DETAIL select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4); + +-- two keys match, the remaining rows can be skipped +select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4); diff --git a/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_simple.q.out b/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_simple.q.out new file mode 100644 index 0000000000..cd4dafb44e --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_simple.q.out @@ -0,0 +1,282 @@ +PREHOOK: query: CREATE TABLE item_dim (key1 int, name string) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@item_dim +POSTHOOK: query: CREATE TABLE item_dim (key1 int, name string) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@item_dim +PREHOOK: query: CREATE TABLE orders_fact (nokey int, key2 int, dt timestamp) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orders_fact +POSTHOOK: query: CREATE TABLE orders_fact (nokey int, key2 int, dt timestamp) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orders_fact +PREHOOK: query: INSERT INTO item_dim values(101, "Item 101") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(101, "Item 101") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO item_dim values(102, "Item 102") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(102, "Item 102") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.nokey SCRIPT [] +PREHOOK: query: EXPLAIN VECTORIZATION DETAIL select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@item_dim +PREHOOK: Input: default@orders_fact +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN VECTORIZATION DETAIL select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@item_dim +POSTHOOK: Input: default@orders_fact +#### A masked pattern was here #### +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orders_fact + filterExpr: key2 is not null (type: boolean) + probeDecodeDetails: cacheKey:HASH_MAP_MAPJOIN_25_container, bigKeyColName:_col0, smallTablePos:1, keyRatio:1.0 + Statistics: Num rows: 6 Data size: 264 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:nokey:int, 1:key2:int, 2:dt:timestamp, 3:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 1:int) + predicate: key2 is not null (type: boolean) + Statistics: Num rows: 6 Data size: 264 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key2 (type: int), dt (type: timestamp) + outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [1, 2] + Statistics: Num rows: 6 Data size: 264 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Map Join Vectorization: + bigTableKeyColumns: 1:int + bigTableRetainColumnNums: [1, 2] + bigTableValueColumns: 1:int, 2:timestamp + className: VectorMapJoinInnerLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + nonOuterSmallTableKeyMapping: [] + projectedOutput: 1:int, 2:timestamp, 1:int, 4:string + smallTableValueMapping: 4:string + hashTableImplementationType: FAST + outputColumnNames: _col0, _col1, _col2, _col3 + input vertices: + 1 Map 2 + Statistics: Num rows: 6 Data size: 290 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: int), _col0 (type: int), _col3 (type: string), _col1 (type: timestamp) + outputColumnNames: _col0, _col1, _col2, _col3 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [1, 1, 4, 2] + Statistics: Num rows: 6 Data size: 290 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 6 Data size: 290 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + includeColumns: [1, 2] + dataColumns: nokey:int, key2:int, dt:timestamp + partitionColumnCount: 0 + scratchColumnTypeNames: [string] + Map 2 + Map Operator Tree: + TableScan + alias: item_dim + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 2 Data size: 376 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:key1:int, 1:name:string, 2:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0:int) + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 2 Data size: 376 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key1 (type: int), name (type: string) + outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1] + Statistics: Num rows: 2 Data size: 376 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 0:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: 1:string + Statistics: Num rows: 2 Data size: 376 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + includeColumns: [0, 1] + dataColumns: key1:int, name:string + partitionColumnCount: 0 + scratchColumnTypeNames: [] + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@item_dim +PREHOOK: Input: default@orders_fact +#### A masked pattern was here #### +POSTHOOK: query: select key1, key2, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@item_dim +POSTHOOK: Input: default@orders_fact +#### A masked pattern was here #### +102 102 Item 102 2004-04-30 00:00:00 +101 101 Item 101 2001-01-30 00:00:00 diff --git a/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_stats.q.out b/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_stats.q.out new file mode 100644 index 0000000000..5fa8ea270f --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/probedecode_mapjoin_stats.q.out @@ -0,0 +1,419 @@ +PREHOOK: query: CREATE TABLE item_dim (key1 int, name string) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@item_dim +POSTHOOK: query: CREATE TABLE item_dim (key1 int, name string) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@item_dim +PREHOOK: query: CREATE TABLE orders_fact (key3 int, key2 int, dt timestamp) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orders_fact +POSTHOOK: query: CREATE TABLE orders_fact (key3 int, key2 int, dt timestamp) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orders_fact +PREHOOK: query: CREATE TABLE seller_dim (key4 int, sellername string) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@seller_dim +POSTHOOK: query: CREATE TABLE seller_dim (key4 int, sellername string) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@seller_dim +PREHOOK: query: INSERT INTO item_dim values(101, "Item 101") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(101, "Item 101") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO item_dim values(102, "Item 102") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(102, "Item 102") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO item_dim values(103, "Item 103") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(103, "Item 103") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO item_dim values(104, "Item 104") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(104, "Item 104") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO item_dim values(105, "Item 105") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@item_dim +POSTHOOK: query: INSERT INTO item_dim values(105, "Item 105") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@item_dim +POSTHOOK: Lineage: item_dim.key1 SCRIPT [] +POSTHOOK: Lineage: item_dim.name SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(12345, 101, '2001-01-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(23456, 104, '2002-02-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(34567, 108, '2003-03-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(45678, 102, '2004-04-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(56789, 109, '2005-05-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@orders_fact +POSTHOOK: query: INSERT INTO orders_fact values(67891, 110, '2006-06-30 00:00:00') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@orders_fact +POSTHOOK: Lineage: orders_fact.dt SCRIPT [] +POSTHOOK: Lineage: orders_fact.key2 SCRIPT [] +POSTHOOK: Lineage: orders_fact.key3 SCRIPT [] +PREHOOK: query: INSERT INTO seller_dim values(12345, "Seller 1") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@seller_dim +POSTHOOK: query: INSERT INTO seller_dim values(12345, "Seller 1") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@seller_dim +POSTHOOK: Lineage: seller_dim.key4 SCRIPT [] +POSTHOOK: Lineage: seller_dim.sellername SCRIPT [] +PREHOOK: query: INSERT INTO seller_dim values(23456, "Seller 2") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@seller_dim +POSTHOOK: query: INSERT INTO seller_dim values(23456, "Seller 2") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@seller_dim +POSTHOOK: Lineage: seller_dim.key4 SCRIPT [] +POSTHOOK: Lineage: seller_dim.sellername SCRIPT [] +PREHOOK: query: EXPLAIN VECTORIZATION DETAIL select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4) +PREHOOK: type: QUERY +PREHOOK: Input: default@item_dim +PREHOOK: Input: default@orders_fact +PREHOOK: Input: default@seller_dim +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN VECTORIZATION DETAIL select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@item_dim +POSTHOOK: Input: default@orders_fact +POSTHOOK: Input: default@seller_dim +#### A masked pattern was here #### +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE), Map 3 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orders_fact + filterExpr: (key2 is not null and key3 is not null) (type: boolean) + probeDecodeDetails: cacheKey:HASH_MAP_MAPJOIN_46_container, bigKeyColName:_col0, smallTablePos:1, keyRatio:0.16666666666666666 + Statistics: Num rows: 6 Data size: 288 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:key3:int, 1:key2:int, 2:dt:timestamp, 3:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterExprAndExpr(children: SelectColumnIsNotNull(col 1:int), SelectColumnIsNotNull(col 0:int)) + predicate: (key2 is not null and key3 is not null) (type: boolean) + Statistics: Num rows: 6 Data size: 288 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key3 (type: int), key2 (type: int), dt (type: timestamp) + outputColumnNames: _col0, _col1, _col2 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1, 2] + Statistics: Num rows: 6 Data size: 288 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: int) + 1 _col0 (type: int) + Map Join Vectorization: + bigTableKeyColumns: 1:int + bigTableRetainColumnNums: [0, 1, 2] + bigTableValueColumns: 0:int, 1:int, 2:timestamp + className: VectorMapJoinInnerLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + nonOuterSmallTableKeyMapping: [] + projectedOutput: 0:int, 1:int, 2:timestamp, 1:int, 4:string + smallTableValueMapping: 4:string + hashTableImplementationType: FAST + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + input vertices: + 1 Map 2 + Statistics: Num rows: 5 Data size: 720 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Map Join Vectorization: + bigTableKeyColumns: 0:int + bigTableRetainColumnNums: [0, 1, 2, 4] + bigTableValueColumns: 0:int, 1:int, 2:timestamp, 1:int, 4:string + className: VectorMapJoinInnerLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + nonOuterSmallTableKeyMapping: [] + projectedOutput: 0:int, 1:int, 2:timestamp, 1:int, 4:string, 0:int, 5:string + smallTableValueMapping: 5:string + hashTableImplementationType: FAST + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + input vertices: + 1 Map 3 + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col3 (type: int), _col1 (type: int), _col0 (type: int), _col5 (type: int), _col6 (type: string), _col4 (type: string), _col2 (type: timestamp) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [1, 1, 0, 0, 5, 4, 2] + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + includeColumns: [0, 1, 2] + dataColumns: key3:int, key2:int, dt:timestamp + partitionColumnCount: 0 + scratchColumnTypeNames: [string, string] + Map 2 + Map Operator Tree: + TableScan + alias: item_dim + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 5 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:key1:int, 1:name:string, 2:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0:int) + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 5 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key1 (type: int), name (type: string) + outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1] + Statistics: Num rows: 5 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 0:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: 1:string + Statistics: Num rows: 5 Data size: 480 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + includeColumns: [0, 1] + dataColumns: key1:int, name:string + partitionColumnCount: 0 + scratchColumnTypeNames: [] + Map 3 + Map Operator Tree: + TableScan + alias: seller_dim + filterExpr: key4 is not null (type: boolean) + Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:key4:int, 1:sellername:string, 2:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0:int) + predicate: key4 is not null (type: boolean) + Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key4 (type: int), sellername (type: string) + outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1] + Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 0:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: 1:string + Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + includeColumns: [0, 1] + dataColumns: key4:int, sellername:string + partitionColumnCount: 0 + scratchColumnTypeNames: [] + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4) +PREHOOK: type: QUERY +PREHOOK: Input: default@item_dim +PREHOOK: Input: default@orders_fact +PREHOOK: Input: default@seller_dim +#### A masked pattern was here #### +POSTHOOK: query: select key1, key2, key3, key4, sellername, name, dt from orders_fact join item_dim on (orders_fact.key2 = item_dim.key1) join seller_dim on (orders_fact.key3 = seller_dim.key4) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@item_dim +POSTHOOK: Input: default@orders_fact +POSTHOOK: Input: default@seller_dim +#### A masked pattern was here #### +101 101 12345 12345 Seller 1 Item 101 2001-01-30 00:00:00 +104 104 23456 23456 Seller 2 Item 104 2002-03-02 00:00:00 -- 2.20.1 (Apple Git-117)