diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index eb3eba536c..e1a69526bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -459,6 +459,16 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex return false; } + // Check if there already exists a semijoin branch + GroupByOperator gb = parseContext.getColExprToGBMap().get(key); + if (gb != null) { + // Already an existing semijoin branch, reuse it + createFinalRsForSemiJoinOp(parseContext, ts, gb, key, keyBaseAlias, + ctx.parent.getChildren().get(0), sjHint != null); + // done! + return true; + } + List keyExprs = new ArrayList(); keyExprs.add(key); @@ -726,6 +736,7 @@ private void createFinalRsForSemiJoinOp( runtimeValuesInfo.setColExprs(rsValueCols); runtimeValuesInfo.setTsColExpr(colExpr); parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo); + parseContext.getColExprToGBMap().put(key, gb); } private Map collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 9a69f90483..3a1f821bd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -120,6 +120,8 @@ new HashMap(); private Map rsToSemiJoinBranchInfo = new HashMap<>(); + private Map colExprToGBMap = + new HashMap<>(); public ParseContext() { } @@ -662,4 +664,12 @@ public void setRsToSemiJoinBranchInfo(Map getRsToSemiJoinBranchInfo() { return rsToSemiJoinBranchInfo; } + + public void setColExprToGBMap(Map colExprToGBMap) { + this.colExprToGBMap = colExprToGBMap; + } + + public Map getColExprToGBMap() { + return colExprToGBMap; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 96525b4b66..5ea7800528 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -532,6 +532,7 @@ public ParseContext getParseContext(ParseContext pCtx, List stack, NodeProcessorCtx procCtx, GroupByOperator gbOp = (GroupByOperator) (stack.get(stack.size() - 2)); GroupByDesc gbDesc = gbOp.getConf(); ArrayList aggregationDescs = gbDesc.getAggregators(); - boolean removeSemiJoin = false; - TableScanOperator ts = sjInfo.getTsOp(); for (AggregationDesc agg : aggregationDescs) { if (agg.getGenericUDAFName() != "bloom_filter") { continue; @@ -841,36 +839,40 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long expectedEntries = udafBloomFilterEvaluator.getExpectedEntries(); if (expectedEntries == -1 || expectedEntries > pCtx.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)) { - removeSemiJoin = true; - if (LOG.isDebugEnabled()) { - LOG.debug("expectedEntries=" + expectedEntries + ". " - + "Either stats unavailable or expectedEntries exceeded max allowable bloomfilter size. " - + "Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + // Remove the semijoin optimization branch along with ALL the mappings + // The parent GB2 has all the branches. Collect them and remove them. + for (Operator op : gbOp.getChildOperators()) { + ReduceSinkOperator rsFinal = (ReduceSinkOperator) op; + TableScanOperator ts = pCtx.getRsToSemiJoinBranchInfo(). + get(rsFinal).getTsOp(); + if (LOG.isDebugEnabled()) { + LOG.debug("expectedEntries=" + expectedEntries + ". " + + "Either stats unavailable or expectedEntries exceeded max allowable bloomfilter size. " + + "Removing semijoin " + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + } + GenTezUtils.removeBranch(rsFinal); + GenTezUtils.removeSemiJoinOperator(pCtx, rsFinal, ts); } - break; + return null; } } // At this point, hinted semijoin case has been handled already // Check if big table is big enough that runtime filtering is // worth it. + TableScanOperator ts = sjInfo.getTsOp(); if (ts.getStatistics() != null) { long numRows = ts.getStatistics().getNumRows(); if (numRows < pCtx.getConf().getLongVar(ConfVars.TEZ_BIGTABLE_MIN_SIZE_SEMIJOIN_REDUCTION)) { - removeSemiJoin = true; if (LOG.isDebugEnabled()) { LOG.debug("Insufficient rows (" + numRows + ") to justify semijoin optimization. Removing semijoin " + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); } + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); } } - if (removeSemiJoin) { - // The stats are not annotated, remove the semijoin operator - GenTezUtils.removeBranch(rs); - GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); - } - return null; } } diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out index a47ce6e583..1d1f86bfaa 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out @@ -692,13 +692,12 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 8 (BROADCAST_EDGE) - Map 7 <- Reducer 6 (BROADCAST_EDGE) - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) + Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE) + Map 6 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) - Reducer 6 <- Map 4 (CUSTOM_SIMPLE_EDGE) - Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -752,22 +751,9 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL - Group By Operator - aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=32) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL - value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) Execution mode: llap LLAP IO: all inputs - Map 7 + Map 6 Map Operator Tree: TableScan alias: srcpart_date @@ -848,19 +834,11 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Reducer 6 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=32) - mode: final - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) - Reducer 8 + Reducer 7 Execution mode: llap Reduce Operator Tree: Group By Operator