diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 726522bd94..31c866a5a9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2885,6 +2885,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Big table for runtime filteting should be of atleast this size"), TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD("hive.tez.dynamic.semijoin.reduction.threshold", (float) 0.50, "Only perform semijoin optimization if the estimated benefit at or above this fraction of the target table"), + TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY("hive.tez.dynamic.semijoin.reduction.hint.only", false, + "When true, only enforce semijoin when a hint is provided"), TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index fb85b9ece2..2ed704d188 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -569,6 +569,7 @@ minillaplocal.query.files=acid_globallimit.q,\ schema_evol_text_vecrow_table.q,\ selectDistinctStar.q,\ semijoin.q,\ + semijoin_hint.q,\ smb_cache.q,\ special_character_in_tabnames_1.q,\ sqlmerge.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index db6b05b7ac..637bc54fbd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -794,7 +794,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo // The semijoin branch can potentially create a task level cycle // with the hashjoin except when it is dynamically partitioned hash // join which takes place in a separate task. - if (context.parseContext.getRsOpToTsOpMap().size() > 0 + if (context.parseContext.getRsToSemiJoinBranchInfo().size() > 0 && removeReduceSink) { removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp, context.parseContext); @@ -826,7 +826,7 @@ private void removeCycleCreatingSemiJoinOps(MapJoinOperator mapjoinOp, } ReduceSinkOperator rs = (ReduceSinkOperator) op; - TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs); + TableScanOperator ts = parseContext.getRsToSemiJoinBranchInfo().get(rs).getTsOp(); if (ts == null) { // skip, no semijoin branch continue; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 838cc69cb6..95bf73302e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -43,12 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; -import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; -import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.*; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; @@ -173,6 +168,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Obje for (DynamicListContext ctx : removerContext) { ExprNodeDesc constNode = new ExprNodeConstantDesc(ctx.parent.getTypeInfo(), true); + // TODO: something adds not-null to filter on the way, so it's (not null and in (rs)); + // is it valid to remove it here? replaceExprNode(ctx, desc, constNode); } return false; @@ -214,17 +211,25 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Obje } } else { LOG.debug("Column " + column + " is not a partition column"); + Map hints = ctx.desc.getHints(); if (semiJoin && ts.getConf().getFilterExpr() != null) { - LOG.debug("Initiate semijoin reduction for " + column); + LOG.debug("Initiate semijoin reduction for " + column + " (" + + ts.getConf().getFilterExpr().getExprString()); // Get the table name from which the min-max values will come. Operator op = ctx.generator; + while (!(op == null || op instanceof TableScanOperator)) { op = op.getParentOperators().get(0); } String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias()); + SemiJoinHint sjHint = (hints != null) ? hints.get(tableAlias) : null; keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column; - semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias); + semiJoinAttempted = generateSemiJoinOperatorPlan( + ctx, parseContext, ts, keyBaseAlias, sjHint); + if (!semiJoinAttempted && sjHint != null) { + throw new SemanticException("The user hint to enforce semijoin failed required conditions"); + } } } @@ -387,7 +392,13 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars // Generates plan for min/max when dynamic partition pruning is ruled out. private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext, - TableScanOperator ts, String keyBaseAlias) throws SemanticException { + TableScanOperator ts, String keyBaseAlias, SemiJoinHint sjHint) throws SemanticException { + + // If semijoin hint is enforced, make sure hint is provided + if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY) + && sjHint == null) { + return false; + } // we will put a fork in the plan at the source of the reduce sink Operator parentOfRS = ctx.generator.getParentOperators().get(0); @@ -441,6 +452,24 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex } } + // If hint is provided and only hinted semijoin optimizations should be + // created, then skip other columns on the table + if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY) + && sjHint.getColName() != null && + !internalColName.equals(sjHint.getColName())) { + return false; + } + + // Check if there already exists a semijoin branch + GroupByOperator gb = parseContext.getColExprToGBMap().get(key); + if (gb != null) { + // Already an existing semijoin branch, reuse it + createFinalRsForSemiJoinOp(parseContext, ts, gb, key, keyBaseAlias, + ctx.parent.getChildren().get(0), sjHint != null); + // done! + return true; + } + List keyExprs = new ArrayList(); keyExprs.add(key); @@ -484,8 +513,6 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - ArrayList groupByExprs = new ArrayList(); - // Add min/max and bloom filter aggregations List aggFnOIs = new ArrayList(); aggFnOIs.add(key.getWritableObjectInspector()); @@ -505,8 +532,14 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex AggregationDesc bloomFilter = new AggregationDesc("bloom_filter", FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false), params, false, Mode.PARTIAL1); - GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); + GenericUDAFBloomFilterEvaluator bloomFilterEval = + (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); bloomFilterEval.setSourceOperator(selectOp); + + if (sjHint != null && sjHint.getNumEntries() > 0) { + LOG.debug("Setting size for " + keyBaseAlias + " to " + sjHint.getNumEntries() + " based on the hint"); + bloomFilterEval.setHintEntries(sjHint.getNumEntries()); + } bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)); bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES)); bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR)); @@ -635,23 +668,56 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex rsOp.getConf().setOutputOperators(outputOperators); } + createFinalRsForSemiJoinOp(parseContext, ts, groupByOpFinal, key, + keyBaseAlias, ctx.parent.getChildren().get(0), sjHint != null); + + return true; + } + + private void createFinalRsForSemiJoinOp( + ParseContext parseContext, TableScanOperator ts, GroupByOperator gb, + ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr, + boolean isHint) throws SemanticException { + ArrayList gbOutputNames = new ArrayList<>(); + // One each for min, max and bloom filter + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2)); + + int colPos = 0; + ArrayList rsValueCols = new ArrayList(); + for (int i = 0; i < gbOutputNames.size() - 1; i++) { + ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(), + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(expr); + } + + // Bloom Filter uses binary + ExprNodeColumnDesc colBFExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(colBFExpr); + // Create the final Reduce Sink Operator ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc( new ArrayList(), rsValueCols, gbOutputNames, false, -1, 0, 1, Operation.NOT_ACID); ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild( - rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal); + rsDescFinal, new RowSchema(gb.getSchema()), gb); + Map columnExprMap = new HashMap<>(); rsOpFinal.setColumnExprMap(columnExprMap); - LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts); - parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts); + LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts); + SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(ts, isHint); + parseContext.getRsToSemiJoinBranchInfo().put(rsOpFinal, sjInfo); // for explain purpose - if (parseContext.getContext().getExplainConfig() != null - && parseContext.getContext().getExplainConfig().isFormatted()) { - List outputOperators = new ArrayList<>(); + if (parseContext.getContext().getExplainConfig() != null && + parseContext.getContext().getExplainConfig().isFormatted()) { + List outputOperators = rsOpFinal.getConf().getOutputOperators(); + if (outputOperators == null) { + outputOperators = new ArrayList<>(); + } outputOperators.add(ts.getOperatorId()); - rsOpFinal.getConf().setOutputOperators(outputOperators); } // Save the info that is required at query time to resolve dynamic/runtime values. @@ -666,10 +732,9 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex runtimeValuesInfo.setTableDesc(rsFinalTableDesc); runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs); runtimeValuesInfo.setColExprs(rsValueCols); - runtimeValuesInfo.setTsColExpr(ctx.parent.getChildren().get(0)); + runtimeValuesInfo.setTsColExpr(colExpr); parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo); - - return true; + parseContext.getColExprToGBMap().put(key, gb); } private Map collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 73a9b0f34d..d375d1b58d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.translator; +import org.apache.hadoop.hive.ql.parse.*; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -72,19 +74,8 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; -import org.apache.hadoop.hive.ql.parse.JoinCond; -import org.apache.hadoop.hive.ql.parse.JoinType; -import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression; -import org.apache.hadoop.hive.ql.parse.PTFTranslator; -import org.apache.hadoop.hive.ql.parse.ParseUtils; -import org.apache.hadoop.hive.ql.parse.RowResolver; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.UnparseTranslator; -import org.apache.hadoop.hive.ql.parse.WindowingComponentizer; -import org.apache.hadoop.hive.ql.parse.WindowingSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -348,6 +339,9 @@ private OpAttr translateJoin(RelNode joinRel) throws SemanticException { // through Hive String[] baseSrc = new String[joinRel.getInputs().size()]; String tabAlias = getHiveDerivedTableAlias(); + Map semiJoinHints = semanticAnalyzer.parseSemiJoinHint( + semanticAnalyzer.getQB().getParseInfo().getHints()); + // 1. Convert inputs OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()]; List> children = new ArrayList>(joinRel.getInputs().size()); @@ -413,7 +407,7 @@ private OpAttr translateJoin(RelNode joinRel) throws SemanticException { // 6. Generate Join operator JoinOperator joinOp = genJoin(joinRel, joinExpressions, filterExpressions, children, - baseSrc, tabAlias); + baseSrc, tabAlias, semiJoinHints); // 7. Return result return new OpAttr(tabAlias, newVcolsInCalcite, joinOp); @@ -726,7 +720,7 @@ private static SelectOperator genReduceSinkAndBacktrackSelect(Operator input, List keepColNames) throws SemanticException { // 1. Generate RS operator // 1.1 Prune the tableNames, only count the tableNames that are not empty strings - // as empty string in table aliases is only allowed for virtual columns. + // as empty string in table aliases is only allowed for virtual columns. String tableAlias = null; Set tableNames = input.getSchema().getTableNames(); for (String tableName : tableNames) { @@ -885,7 +879,8 @@ private static ReduceSinkOperator genReduceSink(Operator input, String tableA private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressions, List> filterExpressions, List> children, - String[] baseSrc, String tabAlias) throws SemanticException { + String[] baseSrc, String tabAlias, Map semiJoinHints) + throws SemanticException { // 1. Extract join type JoinCondDesc[] joinCondns; @@ -1011,6 +1006,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio // 4. We create the join operator with its descriptor JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, filters, joinExpressions); + desc.setSemiJoinHints(semiJoinHints); desc.setReversedExprs(reversedExprs); desc.setFilterMap(filterMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 79662ec5f1..6d26d38468 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.antlr.runtime.tree.Tree; + import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -328,6 +330,7 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept skipCalcitePlan = true; } else { PreCboCtx cboCtx = (PreCboCtx) plannerCtx; + ASTNode oldHints = getQB().getParseInfo().getHints(); // Note: for now, we don't actually pass the queryForCbo to CBO, because // it accepts qb, not AST, and can also access all the private stuff in @@ -395,6 +398,15 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept newAST = reAnalyzeCTASAfterCbo(newAST); } } + if (oldHints != null) { + if (getQB().getParseInfo().getHints() != null) { + LOG.warn("Hints are not null in the optimized tree; before CBO " + oldHints.dump() + + "; after CBO " + getQB().getParseInfo().getHints().dump()); + } else { + LOG.debug("Propagating hints to QB: " + oldHints); + getQB().getParseInfo().setHints(oldHints); + } + } Phase1Ctx ctx_1 = initPhase1Ctx(); if (!doPhase1(newAST, getQB(), ctx_1, null)) { throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan"); @@ -3477,6 +3489,24 @@ private RelNode genSelectRelNode(List calciteColLst, RowResolver out_rw return selRel; } + private void setQueryHints(QB qb) throws SemanticException { + QBParseInfo qbp = getQBParseInfo(qb); + String selClauseName = qbp.getClauseNames().iterator().next(); + Tree selExpr0 = qbp.getSelForClause(selClauseName).getChild(0); + + if (selExpr0.getType() != HiveParser.QUERY_HINT) return; + String hint = ctx.getTokenRewriteStream().toString( + selExpr0.getTokenStartIndex(), selExpr0.getTokenStopIndex()); + LOG.debug("Handling query hints: " + hint); + ParseDriver pd = new ParseDriver(); + try { + ASTNode hintNode = pd.parseHint(hint); + qbp.setHints((ASTNode) hintNode); + } catch (ParseException e) { + throw new SemanticException("failed to parse query hint: "+e.getMessage(), e); + } + } + /** * NOTE: there can only be one select caluse since we don't handle multi * destination insert. @@ -3975,7 +4005,12 @@ private RelNode genLogicalPlan(QB qb, boolean outerMostQB, throw new CalciteSemanticException("Unsupported", UnsupportedFeature.Others); } + // 1.3 process join + // 1.3.1 process hints + setQueryHints(qb); + + // 1.3.2 process the actual join if (qb.getParseInfo().getJoinExpr() != null) { srcRel = genJoinLogicalPlan(qb.getParseInfo().getJoinExpr(), aliasToRel); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index d58f4474b6..83e89af787 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -266,11 +266,14 @@ public static void removeUnionOperators(GenTezProcContext context, BaseWork work } } // This TableScanOperator could be part of semijoin optimization. - Map rsOpToTsOpMap = - context.parseContext.getRsOpToTsOpMap(); - for (ReduceSinkOperator rs : rsOpToTsOpMap.keySet()) { - if (rsOpToTsOpMap.get(rs) == orig) { - rsOpToTsOpMap.put(rs, (TableScanOperator) newRoot); + Map rsToSemiJoinBranchInfo = + context.parseContext.getRsToSemiJoinBranchInfo(); + for (ReduceSinkOperator rs : rsToSemiJoinBranchInfo.keySet()) { + SemiJoinBranchInfo sjInfo = rsToSemiJoinBranchInfo.get(rs); + if (sjInfo.getTsOp() == orig) { + SemiJoinBranchInfo newSJInfo = new SemiJoinBranchInfo( + (TableScanOperator)newRoot, sjInfo.getIsHint()); + rsToSemiJoinBranchInfo.put(rs, newSJInfo); } } } @@ -516,19 +519,18 @@ public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork follo return EdgeType.SIMPLE_EDGE; } - public static void processDynamicMinMaxPushDownOperator( + public static void processDynamicSemiJoinPushDownOperator( GenTezProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo, ReduceSinkOperator rs) throws SemanticException { - TableScanOperator ts = procCtx.parseContext.getRsOpToTsOpMap().get(rs); + SemiJoinBranchInfo sjInfo = procCtx.parseContext.getRsToSemiJoinBranchInfo().get(rs); List rsWorkList = procCtx.childToWorkMap.get(rs); - if (ts == null || rsWorkList == null) { + if (sjInfo == null || rsWorkList == null) { // This happens when the ReduceSink's edge has been removed by cycle // detection logic. Nothing to do here. return; } - LOG.debug("ResduceSink " + rs + " to TableScan " + ts); if (rsWorkList.size() != 1) { StringBuilder sb = new StringBuilder(); @@ -541,6 +543,9 @@ public static void processDynamicMinMaxPushDownOperator( throw new SemanticException(rs + " belongs to multiple BaseWorks: " + sb.toString()); } + TableScanOperator ts = sjInfo.getTsOp(); + LOG.debug("ResduceSink " + rs + " to TableScan " + ts); + BaseWork parentWork = rsWorkList.get(0); BaseWork childWork = procCtx.rootToWorkMap.get(ts); @@ -611,7 +616,7 @@ public static void removeSemiJoinOperator(ParseContext context, skip = true; } } - context.getRsOpToTsOpMap().remove(rs); + context.getRsToSemiJoinBranchInfo().remove(rs); } private static class DynamicValuePredicateContext implements NodeProcessorCtx { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g index 8e70a46235..e110fb33df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g @@ -31,6 +31,7 @@ tokens { TOK_MAPJOIN; TOK_STREAMTABLE; TOK_HINTARGLIST; + TOK_LEFTSEMIJOIN; } @header { @@ -69,6 +70,7 @@ hintItem hintName : KW_MAPJOIN -> TOK_MAPJOIN + | KW_SEMI -> TOK_LEFTSEMIJOIN | KW_STREAMTABLE -> TOK_STREAMTABLE ; @@ -80,4 +82,5 @@ hintArgs hintArgName : Identifier + | Number ; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 3f9f76c57f..9b9769771e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -33,17 +33,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Table; @@ -126,11 +116,12 @@ private boolean needViewColumnAuthorization; private Set acidFileSinks = Collections.emptySet(); - // Map to store mapping between reduce sink Operator and TS Operator for semijoin - private Map rsOpToTsOpMap = - new HashMap(); private Map rsToRuntimeValuesInfo = new HashMap(); + private Map colExprToGBMap = + new HashMap<>(); + private Map rsToSemiJoinBranchInfo = + new HashMap<>(); public ParseContext() { } @@ -666,11 +657,19 @@ public void setRsToRuntimeValuesInfoMap(Map rsOpToTsOpMap) { - this.rsOpToTsOpMap = rsOpToTsOpMap; + public void setColExprToGBMap(Map colExprToGBMap) { + this.colExprToGBMap = colExprToGBMap; } - public Map getRsOpToTsOpMap() { - return rsOpToTsOpMap; + public Map getColExprToGBMap() { + return colExprToGBMap; + } + + public void setRsToSemiJoinBranchInfo(Map rsToSemiJoinBranchInfo) { + this.rsToSemiJoinBranchInfo = rsToSemiJoinBranchInfo; + } + + public Map getRsToSemiJoinBranchInfo() { + return rsToSemiJoinBranchInfo; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index ec76fb79c5..bcef2520f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Arrays; + import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -82,6 +84,7 @@ * We then add a Filter Operator after the Join Operator for this QBJoinTree. */ private final List postJoinFilters; + private Map semiJoinHint; /** * constructor. @@ -429,4 +432,17 @@ public QBJoinTree clone() throws CloneNotSupportedException { return cloned; } + + public void setSemiJoinHint(Map semiJoinHint) { + this.semiJoinHint = semiJoinHint; + } + + public Map getSemiJoinHint() { + return semiJoinHint; + } + + @Override + public String toString() { + return "QBJoinTree [leftAlias=" + leftAlias + ", rightAliases=" + Arrays.toString(rightAliases) + ", leftAliases=" + Arrays.toString(leftAliases) + ", semiJoinHint=" + semiJoinHint + "]"; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bdc5bef8a9..aaad97e68c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8119,6 +8119,7 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, join.getNoOuterJoin(), joinCondns, filterMap, joinKeys); + desc.setSemiJoinHints(join.getSemiJoinHint()); desc.setReversedExprs(reversedExprs); desc.setFilterMap(join.getFilterMap()); // For outer joins, add filters that apply to more than one input @@ -8666,6 +8667,10 @@ private QBJoinTree genUniqueJoinTree(QB qb, ASTNode joinParseTree, parseStreamTables(joinTree, qb); } + if (qb.getParseInfo().getHints() != null) { + // TODO: do we need this for unique join? + joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints())); + } return joinTree; } @@ -8964,6 +8969,8 @@ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree, if ((conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) == false) { parseStreamTables(joinTree, qb); } + + joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints())); } return joinTree; @@ -9011,6 +9018,62 @@ private void parseStreamTables(QBJoinTree joinTree, QB qb) { joinTree.setStreamAliases(streamAliases); } + /** Parses semjoin hints in the query and returns the table names mapped to filter size, or -1 if not specified. + * Hints can be in 3 formats + * 1. TableName, ColumnName, bloom filter entries + * 2. TableName, bloom filter entries, and + * 3. TableName, ColumnName + * */ + public Map parseSemiJoinHint(ASTNode hints) throws SemanticException { + if (hints == null) return null; + Map result = null; + for (Node hintNode : hints.getChildren()) { + ASTNode hint = (ASTNode) hintNode; + if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) continue; + if (result == null) { + result = new HashMap<>(); + } + String alias = null; + String colName = null; + Tree args = hint.getChild(1); + for (int i = 0; i < args.getChildCount(); i++) { + // We can have table names, column names or sizes here (or incorrect hint if the user is so inclined). + String text = args.getChild(i).getText(); + Integer number = null; + try { + number = Integer.parseInt(text); + } catch (NumberFormatException ex) { // Ignore. + } + if (number != null) { + if (alias == null) { + throw new SemanticException("Invalid semijoin hint - arg " + i + " (" + + text + ") is a number but the previous one is not an alias"); + } + SemiJoinHint sjHint = new SemiJoinHint(alias, colName, number); + result.put(alias, sjHint); + alias = null; + colName = null; + } else { + if (alias == null) { + alias = text; + } else if (colName == null ){ + colName = text; + } else { + // No bloom filter entries provided. + SemiJoinHint sjHint = new SemiJoinHint(alias, colName, null); + result.put(alias, sjHint); + alias = text; + colName = null; + } + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Semijoin hint parsed: " + result); + } + return result; + } + /** * Merges node to target */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinBranchInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinBranchInfo.java new file mode 100644 index 0000000000..5d7b9e5c6d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinBranchInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + + +import org.apache.hadoop.hive.ql.exec.TableScanOperator; + +public class SemiJoinBranchInfo { + private TableScanOperator ts; + private boolean isHint; + + public SemiJoinBranchInfo(TableScanOperator ts) { + this.ts = ts; + isHint = false; + } + + public SemiJoinBranchInfo(TableScanOperator ts, boolean isHint) { + this.ts = ts; + this.isHint = isHint; + } + + public TableScanOperator getTsOp() { + return ts; + } + + public boolean getIsHint() { + return isHint; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinHint.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinHint.java new file mode 100644 index 0000000000..1f24e23ff3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemiJoinHint.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +public class SemiJoinHint { + private String tabAlias; + private String colName; + private Integer numEntries; + + public SemiJoinHint(String tabAlias, String colName, Integer numEntries) { + this.tabAlias = tabAlias; + this.colName = colName; + this.numEntries = numEntries; + } + + public String getTabAlias() { + return tabAlias; + } + + public String getColName() { + return colName; + } + + public Integer getNumEntries() { + return numEntries != null ? numEntries : -1; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 5f9ccc8ceb..04a08139e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -531,7 +531,8 @@ public ParseContext getParseContext(ParseContext pCtx, List> component, OptimizeTezProcCont TableScanOperator victimTS = null; ReduceSinkOperator victimRS = null; + // If there is a hint and no operator is removed then throw error + boolean hasHint = false; + boolean removed = false; for (Operator o : component) { // Look for AppMasterEventOperator or ReduceSinkOperator if (o instanceof AppMasterEventOperator) { @@ -184,25 +188,34 @@ private void removeCycleOperator(Set> component, OptimizeTezProcCont || o.getStatistics().getDataSize() < victimAM.getStatistics() .getDataSize()) { victimAM = (AppMasterEventOperator) o; + removed = true; } } else if (o instanceof ReduceSinkOperator) { - TableScanOperator ts = context.parseContext.getRsOpToTsOpMap().get(o); - if (ts == null) { + + SemiJoinBranchInfo sjInfo = + context.parseContext.getRsToSemiJoinBranchInfo().get(o); + if (sjInfo == null ) continue; + if (sjInfo.getIsHint()) { + // Skipping because of hint. Mark this info, + hasHint = true; continue; } + + TableScanOperator ts = sjInfo.getTsOp(); // Sanity check assert component.contains(ts); if (victimRS == null || ts.getStatistics().getDataSize() < - victimTS.getStatistics().getDataSize()) { - victimRS = (ReduceSinkOperator) o; - victimTS = ts; - } + victimTS.getStatistics().getDataSize()) { + victimRS = (ReduceSinkOperator) o; + victimTS = ts; + removed = true; } } + } - // Always set the min/max optimization as victim. + // Always set the semijoin optimization as victim. Operator victim = victimRS; if (victimRS == null && victimAM != null ) { @@ -226,6 +239,11 @@ private void removeCycleOperator(Set> component, OptimizeTezProcCont } } + if (hasHint && !removed) { + // There is hint but none of the operators removed. Throw error + throw new SemanticException("The user hint is causing an operator cycle. Please fix it and retry"); + } + if (victim == null || (!context.pruningOpsRemovedByPriorOpt.isEmpty() && context.pruningOpsRemovedByPriorOpt.contains(victim))) { @@ -286,11 +304,12 @@ private void connect(Operator o, AtomicInteger index, Stack> node LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); children.add(ts); } else if (o instanceof ReduceSinkOperator){ - // min/max case + // semijoin case children = new ArrayList>(); children.addAll(o.getChildOperators()); - TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(o); - if (ts != null) { + SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(o); + if (sjInfo != null ) { + TableScanOperator ts = sjInfo.getTsOp(); LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); children.add(ts); } @@ -459,7 +478,7 @@ protected void generateTaskTree(List> rootTasks, Pa if (pCtx.getRsToRuntimeValuesInfoMap().size() > 0) { for (ReduceSinkOperator rs : pCtx.getRsToRuntimeValuesInfoMap().keySet()) { // Process min/max - GenTezUtils.processDynamicMinMaxPushDownOperator( + GenTezUtils.processDynamicSemiJoinPushDownOperator( procCtx, pCtx.getRsToRuntimeValuesInfoMap().get(rs), rs); } } @@ -610,7 +629,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private static void removeSemijoinOptimizationFromSMBJoins( OptimizeTezProcContext procCtx) throws SemanticException { if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) || - procCtx.parseContext.getRsOpToTsOpMap().size() == 0) { + procCtx.parseContext.getRsToSemiJoinBranchInfo().size() == 0) { return; } @@ -629,9 +648,9 @@ private static void removeSemijoinOptimizationFromSMBJoins( GraphWalker ogw = new PreOrderOnceWalker(disp); ogw.startWalking(topNodes, null); + List tsOps = new ArrayList<>(); // Iterate over the map and remove semijoin optimizations if needed. for (CommonMergeJoinOperator joinOp : ctx.JoinOpToTsOpMap.keySet()) { - List tsOps = new ArrayList(); // Get one top level TS Op directly from the stack tsOps.add(ctx.JoinOpToTsOpMap.get(joinOp)); @@ -644,7 +663,7 @@ private static void removeSemijoinOptimizationFromSMBJoins( } assert parent instanceof SelectOperator; - while(parent != null) { + while (parent != null) { if (parent instanceof TableScanOperator) { tsOps.add((TableScanOperator) parent); break; @@ -652,20 +671,24 @@ private static void removeSemijoinOptimizationFromSMBJoins( parent = parent.getParentOperators().get(0); } } - - // Now the relevant TableScanOperators are known, find if there exists - // a semijoin filter on any of them, if so, remove it. - ParseContext pctx = procCtx.parseContext; - for (TableScanOperator ts : tsOps) { - for (ReduceSinkOperator rs : pctx.getRsOpToTsOpMap().keySet()) { - if (ts == pctx.getRsOpToTsOpMap().get(rs)) { - // match! - if (LOG.isDebugEnabled()) { - LOG.debug("Semijoin optimization found going to SMB join. Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); - } - GenTezUtils.removeBranch(rs); - GenTezUtils.removeSemiJoinOperator(pctx, rs, ts); + } + // Now the relevant TableScanOperators are known, find if there exists + // a semijoin filter on any of them, if so, remove it. + + ParseContext pctx = procCtx.parseContext; + for (TableScanOperator ts : tsOps) { + for (ReduceSinkOperator rs : pctx.getRsToSemiJoinBranchInfo().keySet()) { + SemiJoinBranchInfo sjInfo = pctx.getRsToSemiJoinBranchInfo().get(rs); + if (ts == sjInfo.getTsOp()) { + // match! + if (LOG.isDebugEnabled()) { + LOG.debug("Semijoin optimization found going to SMB join. Removing semijoin " + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + } + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(pctx, rs, ts); + if (sjInfo.getIsHint()) { + LOG.debug("Removing hinted semijoin as it is with SMB join " + rs + " : " + ts); } } } @@ -692,7 +715,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private static void removeSemiJoinCyclesDueToMapsideJoins( OptimizeTezProcContext procCtx) throws SemanticException { if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) || - procCtx.parseContext.getRsOpToTsOpMap().size() == 0) { + procCtx.parseContext.getRsToSemiJoinBranchInfo().size() == 0) { return; } @@ -745,10 +768,10 @@ private static void removeSemiJoinCyclesDueToMapsideJoins( } ReduceSinkOperator rs = ((ReduceSinkOperator) child); - TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs); - if (ts == null) { - continue; - } + SemiJoinBranchInfo sjInfo = pCtx.getRsToSemiJoinBranchInfo().get(rs); + if (sjInfo == null) continue; + + TableScanOperator ts = sjInfo.getTsOp(); // This is a semijoin branch. Find if this is creating a potential // cycle with childJoin. for (Operator parent : childJoin.getParentOperators()) { @@ -769,6 +792,9 @@ private static void removeSemiJoinCyclesDueToMapsideJoins( } GenTezUtils.removeBranch(rs); GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); + if (sjInfo.getIsHint()) { + LOG.debug("Removing hinted semijoin as it is creating cycles with mapside joins " + rs + " : " + ts); + } } } } @@ -783,8 +809,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert nd instanceof ReduceSinkOperator; ReduceSinkOperator rs = (ReduceSinkOperator) nd; ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext; - TableScanOperator ts = pCtx.getRsOpToTsOpMap().get(rs); - if (ts == null) { + SemiJoinBranchInfo sjInfo = pCtx.getRsToSemiJoinBranchInfo().get(rs); + if (sjInfo == null) { // nothing to do here. return null; } @@ -794,7 +820,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, GroupByOperator gbOp = (GroupByOperator) (stack.get(stack.size() - 2)); GroupByDesc gbDesc = gbOp.getConf(); ArrayList aggregationDescs = gbDesc.getAggregators(); - boolean removeSemiJoin = false; for (AggregationDesc agg : aggregationDescs) { if (agg.getGenericUDAFName() != "bloom_filter") { continue; @@ -802,38 +827,47 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, GenericUDAFBloomFilterEvaluator udafBloomFilterEvaluator = (GenericUDAFBloomFilterEvaluator) agg.getGenericUDAFEvaluator(); + if (udafBloomFilterEvaluator.hasHintEntries()) + return null; // Created using hint, skip it + long expectedEntries = udafBloomFilterEvaluator.getExpectedEntries(); if (expectedEntries == -1 || expectedEntries > pCtx.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)) { - removeSemiJoin = true; - if (LOG.isDebugEnabled()) { - LOG.debug("expectedEntries=" + expectedEntries + ". " - + "Either stats unavailable or expectedEntries exceeded max allowable bloomfilter size. " - + "Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + // Remove the semijoin optimization branch along with ALL the mappings + // The parent GB2 has all the branches. Collect them and remove them. + for (Operator op : gbOp.getChildOperators()) { + ReduceSinkOperator rsFinal = (ReduceSinkOperator) op; + TableScanOperator ts = pCtx.getRsToSemiJoinBranchInfo(). + get(rsFinal).getTsOp(); + if (LOG.isDebugEnabled()) { + LOG.debug("expectedEntries=" + expectedEntries + ". " + + "Either stats unavailable or expectedEntries exceeded max allowable bloomfilter size. " + + "Removing semijoin " + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + } + GenTezUtils.removeBranch(rsFinal); + GenTezUtils.removeSemiJoinOperator(pCtx, rsFinal, ts); } - break; + return null; } } + // At this point, hinted semijoin case has been handled already // Check if big table is big enough that runtime filtering is // worth it. + TableScanOperator ts = sjInfo.getTsOp(); if (ts.getStatistics() != null) { long numRows = ts.getStatistics().getNumRows(); if (numRows < pCtx.getConf().getLongVar(ConfVars.TEZ_BIGTABLE_MIN_SIZE_SEMIJOIN_REDUCTION)) { - removeSemiJoin = true; + // The stats are not annotated, remove the semijoin operator if (LOG.isDebugEnabled()) { LOG.debug("Insufficient rows (" + numRows + ") to justify semijoin optimization. Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); } + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); } } - - if (removeSemiJoin) { - // The stats are not annotated, remove the semijoin operator - GenTezUtils.removeBranch(rs); - GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); - } return null; } } @@ -898,15 +932,23 @@ private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperato } ReduceSinkOperator rs = (ReduceSinkOperator) child; - TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs); - if (ts == null || ts != bigTableTS) { - // skip, no semijoin or not the one we are looking for. + SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(rs); + if (sjInfo == null) continue; + + TableScanOperator ts = sjInfo.getTsOp(); + if (ts != bigTableTS) { + // skip, not the one we are looking for. continue; } + parallelEdges = true; + + if (sjInfo.getIsHint()) { + // Created by hint, skip it + continue; + } // Add the semijoin branch to the map semijoins.put(rs, ts); - parallelEdges = true; } } } @@ -931,6 +973,8 @@ private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) topOps.addAll(procCtx.parseContext.getTopOps().values()); Map semijoins = new HashMap<>(); + List rsList = new ArrayList<>(); + List tsList = new ArrayList<>(); for (Operator parent : topOps) { // A TS can have multiple branches due to DPP Or Semijoin Opt. // USe DFS to traverse all the branches until RS is hit. @@ -1134,10 +1178,15 @@ private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) } List semijoinRsToRemove = new ArrayList(); - Map map = procCtx.parseContext.getRsOpToTsOpMap(); + Map map = procCtx.parseContext.getRsToSemiJoinBranchInfo(); double semijoinReductionThreshold = procCtx.conf.getFloatVar( HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD); for (ReduceSinkOperator rs : map.keySet()) { + SemiJoinBranchInfo sjInfo = map.get(rs); + if (sjInfo.getIsHint()) { + // Semijoin created using hint, skip it + continue; + } // rs is semijoin optimization branch, which should look like -SEL-GB1-RS1-GB2-RS2 // Get to the SelectOperator ancestor SelectOperator sel = null; @@ -1152,7 +1201,7 @@ private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) } // Check the ndv/rows from the SEL vs the destination tablescan the semijoin opt is going to. - TableScanOperator ts = map.get(rs); + TableScanOperator ts = sjInfo.getTsOp(); RuntimeValuesInfo rti = procCtx.parseContext.getRsToRuntimeValuesInfoMap().get(rs); ExprNodeDesc tsExpr = rti.getTsColExpr(); // In the SEL operator of the semijoin branch, there should be only one column in the operator @@ -1172,7 +1221,7 @@ private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) } for (ReduceSinkOperator rs : semijoinRsToRemove) { - TableScanOperator ts = map.get(rs); + TableScanOperator ts = map.get(rs).getTsOp(); if (LOG.isDebugEnabled()) { LOG.debug("Reduction factor not satisfied for " + OperatorUtils.getOpNamePretty(rs) + "-" + OperatorUtils.getOpNamePretty(ts) + ". Removing semijoin optimization."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java index 18e4fbdc0a..3143554ec6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.plan; +import java.util.Map; + import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.parse.SemiJoinHint; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** @@ -29,14 +32,17 @@ Operator source; int keyIndex; + Map hints; public ExprNodeDynamicListDesc() { } - public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator source, int keyIndex) { + public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator source, + int keyIndex, Map hints) { super(typeInfo); this.source = source; this.keyIndex = keyIndex; + this.hints = hints; } public void setSource(Operator source) { @@ -57,8 +63,7 @@ public int getKeyIndex() { @Override public ExprNodeDesc clone() { - ExprNodeDynamicListDesc clone = new ExprNodeDynamicListDesc(typeInfo, source, keyIndex); - return clone; + return new ExprNodeDynamicListDesc(typeInfo, source, keyIndex, hints); } @Override @@ -78,4 +83,8 @@ public String getExprString() { public String toString() { return source.toString(); } + + public Map getHints() { + return hints; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index bcf3691342..032c7bb28d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.SemiJoinHint; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -106,6 +107,10 @@ private transient Map> aliasToOpInfo; private transient boolean leftInputJoin; private transient List streamAliases; + // Note: there are two things in Hive called semi-joins - the left semi join construct, + // and also a bloom-filter based optimization that came later. This is for the latter. + // Everything else in this desc that says "semi-join" is for the former. + private transient Map semiJoinHints; public JoinDesc() { } @@ -197,6 +202,7 @@ public JoinDesc(JoinDesc clone) { this.filterMap = clone.filterMap; this.residualFilterExprs = clone.residualFilterExprs; this.statistics = clone.statistics; + this.semiJoinHints = clone.semiJoinHints; } public Map> getExprs() { @@ -682,4 +688,16 @@ public void cloneQBJoinTreeProps(JoinDesc joinDesc) { streamAliases = joinDesc.streamAliases == null ? null : new ArrayList(joinDesc.streamAliases); } + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(JoinDesc.class); + public void setSemiJoinHints(Map semiJoinHints) { + if (semiJoinHints != null || this.semiJoinHints != null) { + LOG.debug("Setting semi-join hints to " + semiJoinHints); + } + this.semiJoinHints = semiJoinHints; + } + + public Map getSemiJoinHints() { + return semiJoinHints; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index 71c731081f..f45daa8828 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.parse.SemiJoinHint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -134,13 +135,12 @@ public ParseContext getParseContext() { public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - ParseContext pCtx = ((SyntheticContext) procCtx).getParseContext(); - @SuppressWarnings("unchecked") CommonJoinOperator join = (CommonJoinOperator) nd; ReduceSinkOperator source = (ReduceSinkOperator) stack.get(stack.size() - 2); int srcPos = join.getParentOperators().indexOf(source); + Map hints = join.getConf().getSemiJoinHints(); List> parents = join.getParentOperators(); @@ -181,7 +181,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, inArgs.add(sourceKeys.get(i)); ExprNodeDynamicListDesc dynamicExpr = - new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i); + new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i, hints); inArgs.add(dynamicExpr); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java index 2b84beb100..2413ae6d26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBloomFilter.java @@ -72,6 +72,7 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticE public static class GenericUDAFBloomFilterEvaluator extends GenericUDAFEvaluator { // Source operator to get the number of entries private SelectOperator sourceOperator; + private long hintEntries = -1; private long maxEntries = 0; private long minEntries = 0; private float factor = 1; @@ -254,6 +255,10 @@ public Object terminatePartial(AggregationBuffer agg) throws HiveException { } public long getExpectedEntries() { + // If hint is provided use that size. + if (hintEntries > 0 ) + return hintEntries; + long expectedEntries = -1; if (sourceOperator != null && sourceOperator.getStatistics() != null) { Statistics stats = sourceOperator.getStatistics(); @@ -294,6 +299,14 @@ public void setSourceOperator(SelectOperator sourceOperator) { this.sourceOperator = sourceOperator; } + public void setHintEntries(long hintEntries) { + this.hintEntries = hintEntries; + } + + public boolean hasHintEntries() { + return hintEntries != -1; + } + public void setMaxEntries(long maxEntries) { this.maxEntries = maxEntries; } diff --git a/ql/src/test/queries/clientpositive/semijoin_hint.q b/ql/src/test/queries/clientpositive/semijoin_hint.q new file mode 100644 index 0000000000..8b079e62e7 --- /dev/null +++ b/ql/src/test/queries/clientpositive/semijoin_hint.q @@ -0,0 +1,51 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.cbo.enable=true; +set hive.compute.query.using.stats=false; +set hive.mapred.mode=nonstrict; +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.tez.dynamic.partition.pruning=true; +set hive.tez.dynamic.semijoin.reduction=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; +set hive.stats.autogather=true; +set hive.tez.bigtable.minsize.semijoin.reduction=1; +set hive.tez.min.bloom.filter.entries=1; +set hive.tez.dynamic.semijoin.reduction.threshold=-999999999999; + +-- Create Tables +create table alltypesorc_int ( cint int, cstring string ) stored as ORC; +create table srcpart_date (str string, value string) partitioned by (ds string ) stored as ORC; +CREATE TABLE srcpart_small(key1 STRING, value1 STRING) partitioned by (ds string) STORED as ORC; + +-- Add Partitions +alter table srcpart_date add partition (ds = "2008-04-08"); +alter table srcpart_date add partition (ds = "2008-04-09"); + +alter table srcpart_small add partition (ds = "2008-04-08"); +alter table srcpart_small add partition (ds = "2008-04-09"); + +-- Load +insert overwrite table alltypesorc_int select cint, cstring1 from alltypesorc; +insert overwrite table srcpart_date partition (ds = "2008-04-08" ) select key, value from srcpart where ds = "2008-04-08"; +insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09"; +insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09"; +analyze table alltypesorc_int compute statistics for columns; +analyze table srcpart_date compute statistics for columns; +analyze table srcpart_small compute statistics for columns; + +set hive.cbo.returnpath.hiveop=true; + +create table srccc as select * from src; + +explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1); + +set hive.cbo.returnpath.hiveop=false; + +explain select /*+ semi(k, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1); + +set hive.cbo.enable=false; + +explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1); + diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out index a47ce6e583..1d1f86bfaa 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out @@ -692,13 +692,12 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 8 (BROADCAST_EDGE) - Map 7 <- Reducer 6 (BROADCAST_EDGE) - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) + Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE) + Map 6 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) - Reducer 6 <- Map 4 (CUSTOM_SIMPLE_EDGE) - Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -752,22 +751,9 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL - Group By Operator - aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=32) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL - value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) Execution mode: llap LLAP IO: all inputs - Map 7 + Map 6 Map Operator Tree: TableScan alias: srcpart_date @@ -848,19 +834,11 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Reducer 6 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=32) - mode: final - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Reducer 8 + Reducer 7 Execution mode: llap Reduce Operator Tree: Group By Operator diff --git a/ql/src/test/results/clientpositive/llap/semijoin_hint.q.out b/ql/src/test/results/clientpositive/llap/semijoin_hint.q.out new file mode 100644 index 0000000000..7a550edeba --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/semijoin_hint.q.out @@ -0,0 +1,531 @@ +PREHOOK: query: create table alltypesorc_int ( cint int, cstring string ) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@alltypesorc_int +POSTHOOK: query: create table alltypesorc_int ( cint int, cstring string ) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@alltypesorc_int +PREHOOK: query: create table srcpart_date (str string, value string) partitioned by (ds string ) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart_date +POSTHOOK: query: create table srcpart_date (str string, value string) partitioned by (ds string ) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart_date +PREHOOK: query: CREATE TABLE srcpart_small(key1 STRING, value1 STRING) partitioned by (ds string) STORED as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart_small +POSTHOOK: query: CREATE TABLE srcpart_small(key1 STRING, value1 STRING) partitioned by (ds string) STORED as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart_small +PREHOOK: query: alter table srcpart_date add partition (ds = "2008-04-08") +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@srcpart_date +POSTHOOK: query: alter table srcpart_date add partition (ds = "2008-04-08") +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@srcpart_date +POSTHOOK: Output: default@srcpart_date@ds=2008-04-08 +PREHOOK: query: alter table srcpart_date add partition (ds = "2008-04-09") +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@srcpart_date +POSTHOOK: query: alter table srcpart_date add partition (ds = "2008-04-09") +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@srcpart_date +POSTHOOK: Output: default@srcpart_date@ds=2008-04-09 +PREHOOK: query: alter table srcpart_small add partition (ds = "2008-04-08") +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@srcpart_small +POSTHOOK: query: alter table srcpart_small add partition (ds = "2008-04-08") +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@srcpart_small +POSTHOOK: Output: default@srcpart_small@ds=2008-04-08 +PREHOOK: query: alter table srcpart_small add partition (ds = "2008-04-09") +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@srcpart_small +POSTHOOK: query: alter table srcpart_small add partition (ds = "2008-04-09") +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@srcpart_small +POSTHOOK: Output: default@srcpart_small@ds=2008-04-09 +PREHOOK: query: insert overwrite table alltypesorc_int select cint, cstring1 from alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@alltypesorc_int +POSTHOOK: query: insert overwrite table alltypesorc_int select cint, cstring1 from alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@alltypesorc_int +POSTHOOK: Lineage: alltypesorc_int.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: alltypesorc_int.cstring SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: insert overwrite table srcpart_date partition (ds = "2008-04-08" ) select key, value from srcpart where ds = "2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Output: default@srcpart_date@ds=2008-04-08 +POSTHOOK: query: insert overwrite table srcpart_date partition (ds = "2008-04-08" ) select key, value from srcpart where ds = "2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Lineage: srcpart_date PARTITION(ds=2008-04-08).str SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: srcpart_date PARTITION(ds=2008-04-08).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@srcpart_date@ds=2008-04-09 +POSTHOOK: query: insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Lineage: srcpart_date PARTITION(ds=2008-04-09).str SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: srcpart_date PARTITION(ds=2008-04-09).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@srcpart_small@ds=2008-04-09 +POSTHOOK: query: insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: default@srcpart_small@ds=2008-04-09 +POSTHOOK: Lineage: srcpart_small PARTITION(ds=2008-04-09).key1 SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: srcpart_small PARTITION(ds=2008-04-09).value1 SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: analyze table alltypesorc_int compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc_int +PREHOOK: Output: default@alltypesorc_int +#### A masked pattern was here #### +POSTHOOK: query: analyze table alltypesorc_int compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc_int +POSTHOOK: Output: default@alltypesorc_int +#### A masked pattern was here #### +PREHOOK: query: analyze table srcpart_date compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date +PREHOOK: Input: default@srcpart_date@ds=2008-04-08 +PREHOOK: Input: default@srcpart_date@ds=2008-04-09 +PREHOOK: Output: default@srcpart_date +PREHOOK: Output: default@srcpart_date@ds=2008-04-08 +PREHOOK: Output: default@srcpart_date@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: analyze table srcpart_date compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Input: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Output: default@srcpart_date +POSTHOOK: Output: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Output: default@srcpart_date@ds=2008-04-09 +#### A masked pattern was here #### +PREHOOK: query: analyze table srcpart_small compute statistics for columns +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_small +PREHOOK: Input: default@srcpart_small@ds=2008-04-08 +PREHOOK: Input: default@srcpart_small@ds=2008-04-09 +PREHOOK: Output: default@srcpart_small +PREHOOK: Output: default@srcpart_small@ds=2008-04-08 +PREHOOK: Output: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: analyze table srcpart_small compute statistics for columns +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_small +POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 +POSTHOOK: Output: default@srcpart_small +POSTHOOK: Output: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Output: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +PREHOOK: query: create table srccc as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@srccc +POSTHOOK: query: create table srccc as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srccc +POSTHOOK: Lineage: srccc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: srccc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +PREHOOK: type: QUERY +POSTHOOK: query: explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 5 <- Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: k + filterExpr: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: str (type: string) + outputColumnNames: str + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: str (type: string) + sort order: + + Map-reduce partition columns: str (type: string) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1000) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Map 5 + Map Operator Tree: + TableScan + alias: v + filterExpr: (key1 is not null and (key1 BETWEEN DynamicValue(RS_3_k_key1_min) AND DynamicValue(RS_3_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_3_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: (key1 is not null and (key1 BETWEEN DynamicValue(RS_3_k_key1_min) AND DynamicValue(RS_3_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_3_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string) + outputColumnNames: key1 + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: key1 (type: string) + sort order: + + Map-reduce partition columns: key1 (type: string) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 str (type: string) + 1 key1 (type: string) + Statistics: Num rows: 9756 Data size: 78048 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + Statistics: Num rows: 9756 Data size: 39024 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: $f0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=410) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select /*+ semi(k, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +PREHOOK: type: QUERY +POSTHOOK: query: explain select /*+ semi(k, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 5 <- Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: k + filterExpr: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1000) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Map 5 + Map Operator Tree: + TableScan + alias: v + filterExpr: (key1 is not null and (key1 BETWEEN DynamicValue(RS_6_k_key1_min) AND DynamicValue(RS_6_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_6_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: (key1 is not null and (key1 BETWEEN DynamicValue(RS_6_k_key1_min) AND DynamicValue(RS_6_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_6_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 9756 Data size: 78048 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=410) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +PREHOOK: type: QUERY +POSTHOOK: query: explain select /*+ semi(k, str, 1000)*/ count(*) from srcpart_date k join srcpart_small v on (k.str = v.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 5 <- Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: k + filterExpr: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: str is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: str (type: string) + sort order: + + Map-reduce partition columns: str (type: string) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: str (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1000) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Map 5 + Map Operator Tree: + TableScan + alias: v + filterExpr: (key1 is not null and (key1 BETWEEN DynamicValue(RS_3_k_key1_min) AND DynamicValue(RS_3_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_3_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: (key1 is not null and (key1 BETWEEN DynamicValue(RS_3_k_key1_min) AND DynamicValue(RS_3_k_key1_max) and in_bloom_filter(key1, DynamicValue(RS_3_k_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: key1 (type: string) + sort order: + + Map-reduce partition columns: key1 (type: string) + Statistics: Num rows: 1000 Data size: 87000 Basic stats: COMPLETE Column stats: PARTIAL + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 str (type: string) + 1 key1 (type: string) + Statistics: Num rows: 9756 Data size: 78048 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=410) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +