diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index beed6b8..4a22d88 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -43,11 +43,15 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -717,6 +721,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { + Operator parentSelectOpOfBigTableOp = parentBigTableOp.getParentOperators().get(0); if (removeReduceSink) { for (Operator p : parentBigTableOp.getParentOperators()) { // we might have generated a dynamic partition operator chain. Since @@ -759,11 +764,65 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo } op.getChildOperators().remove(joinOp); } + + // Remove semijoin Op if there is any. + if (context.parseContext.getRsOpToTsOpMap().size() > 0) { + removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp, + context.parseContext); + } } return mapJoinOp; } + // Remove any semijoin branch associated with mapjoin's parent's operator + // pipeline which can cause a cycle after mapjoin optimization. + private void removeCycleCreatingSemiJoinOps(MapJoinOperator mapjoinOp, + Operator parentSelectOpOfBigTable, + ParseContext parseContext) throws SemanticException { + boolean semiJoinCycle = false; + ReduceSinkOperator rs = null; + TableScanOperator ts = null; + for (Operator op : parentSelectOpOfBigTable.getChildOperators()) { + if (!(op instanceof SelectOperator)) { + continue; + } + + while (op.getChildOperators().size() > 0 ) { + op = op.getChildOperators().get(0); + if (!(op instanceof ReduceSinkOperator)) { + continue; + } + rs = (ReduceSinkOperator) op; + ts = parseContext.getRsOpToTsOpMap().get(rs); + if (ts == null) { + continue; + } + for (Operator parent : mapjoinOp.getParentOperators()) { + if (!(parent instanceof ReduceSinkOperator)) { + continue; + } + + Set tsOps = OperatorUtils.findOperatorsUpstream(parent, + TableScanOperator.class); + parent = tsOps.iterator().next(); + + // If the parent is same as the ts, then we have a cycle. + if (ts == parent) { + semiJoinCycle = true; + break; + } + } + } + } + + // By design there can be atmost 1 such cycle. + if (semiJoinCycle) { + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(parseContext, rs, ts); + } + } + private AppMasterEventOperator findDynamicPartitionBroadcast(Operator parent) { for (Operator op : parent.getChildOperators()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 6141391..aee74ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -568,7 +569,7 @@ public static void removeSemiJoinOperator(ParseContext context, TypeInfoFactory.booleanTypeInfo, Boolean.TRUE); DynamicValuePredicateContext filterDynamicValuePredicatesCollection = new DynamicValuePredicateContext(); - collectDynamicValuePredicates(ts.getConf().getFilterExpr(), + collectDynamicValuePredicates(((FilterOperator)(ts.getChildOperators().get(0))).getConf().getPredicate(), filterDynamicValuePredicatesCollection); for (ExprNodeDesc nodeToRemove : filterDynamicValuePredicatesCollection .childParentMapping.keySet()) { diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q index 13797c0..e686af6 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q @@ -62,6 +62,18 @@ select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcp set hive.tez.dynamic.semijoin.reduction=true; EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring); +--set hive.tez.dynamic.semijoin.reduction=false; + +-- With Mapjoins. +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=100000000000; + +EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +set hive.tez.dynamic.semijoin.reduction=true; +EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); drop table srcpart_date; drop table srcpart_small; diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out index e89526e..cacde93 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out @@ -1509,6 +1509,280 @@ POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 #### A masked pattern was here #### 0 +PREHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_key_min) AND DynamicValue(RS_7_srcpart_small_key_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_key_min) AND DynamicValue(RS_7_srcpart_small_key_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 2200 Data size: 404800 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1000) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1000) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date +PREHOOK: Input: default@srcpart_date@ds=2008-04-08 +PREHOOK: Input: default@srcpart_date@ds=2008-04-09 +PREHOOK: Input: default@srcpart_small +PREHOOK: Input: default@srcpart_small@ds=2008-04-08 +PREHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Input: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Input: default@srcpart_small +POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +8224 +PREHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_key_min) AND DynamicValue(RS_7_srcpart_small_key_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_key_min) AND DynamicValue(RS_7_srcpart_small_key_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 2200 Data size: 404800 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 184000 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1000) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1000) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date +PREHOOK: Input: default@srcpart_date@ds=2008-04-08 +PREHOOK: Input: default@srcpart_date@ds=2008-04-09 +PREHOOK: Input: default@srcpart_small +PREHOOK: Input: default@srcpart_small@ds=2008-04-08 +PREHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Input: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Input: default@srcpart_small +POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +8224 PREHOOK: query: drop table srcpart_date PREHOOK: type: DROPTABLE PREHOOK: Input: default@srcpart_date