diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7cee344295..056f2d7834 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2927,6 +2927,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Big table for runtime filteting should be of atleast this size"), TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD("hive.tez.dynamic.semijoin.reduction.threshold", (float) 0.50, "Only perform semijoin optimization if the estimated benefit at or above this fraction of the target table"), + TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN("hive.tez.dynamic.semijoin.reduction.for.mapjoin", false, + "Use a semi-join branch for map-joins. This may not make it faster, but is helpful in certain join patterns."), TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 5614c26819..2b575b571d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1058,8 +1058,10 @@ private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperato private void removeSemijoinsParallelToMapJoin(OptimizeTezProcContext procCtx) throws SemanticException { if(!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) || - !procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN)) { - // Not needed without semi-join reduction + !procCtx.conf.getBoolVar(ConfVars.HIVECONVERTJOIN) || + procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN)) { + // Not needed without semi-join reduction or mapjoins or when semijoins + // are enabled for parallel mapjoins. return; } diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q index d631401760..b22890bc9d 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q @@ -76,7 +76,7 @@ select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcp EXPLAIN extended select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); set hive.tez.dynamic.semijoin.reduction=false; --- With Mapjoins. +-- With Mapjoins, there shouldn't be any semijoin parallel to mapjoin. set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=100000000000; @@ -86,6 +86,11 @@ select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcp set hive.tez.dynamic.semijoin.reduction=true; EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +set hive.tez.dynamic.semijoin.reduction.for.mapjoin=true; +-- Enable semijoin parallel to mapjoins. +EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +set hive.tez.dynamic.semijoin.reduction.for.mapjoin=false; set hive.tez.dynamic.semijoin.reduction=false; -- multiple sources, different keys @@ -94,6 +99,11 @@ select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcp set hive.tez.dynamic.semijoin.reduction=true; EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring); +set hive.tez.dynamic.semijoin.reduction.for.mapjoin=true; +-- Enable semijoin parallel to mapjoins. +EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +set hive.tez.dynamic.semijoin.reduction.for.mapjoin=false; --set hive.tez.dynamic.semijoin.reduction=false; -- With unions diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out index 2eedb6efb3..478b0828a3 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out @@ -2103,6 +2103,143 @@ POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 #### A masked pattern was here #### 176 +PREHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (key is not null and (key BETWEEN DynamicValue(RS_7_srcpart_small_key1_min) AND DynamicValue(RS_7_srcpart_small_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and (key BETWEEN DynamicValue(RS_7_srcpart_small_key1_min) AND DynamicValue(RS_7_srcpart_small_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 129 Data size: 1032 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=40) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=40) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date +PREHOOK: Input: default@srcpart_date@ds=2008-04-08 +PREHOOK: Input: default@srcpart_date@ds=2008-04-09 +PREHOOK: Input: default@srcpart_small +PREHOOK: Input: default@srcpart_small@ds=2008-04-08 +PREHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Input: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Input: default@srcpart_small +POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +176 PREHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring) PREHOOK: type: QUERY POSTHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring) @@ -2391,6 +2528,143 @@ POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 #### A masked pattern was here #### 0 +PREHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (key is not null and (key BETWEEN DynamicValue(RS_7_srcpart_small_key1_min) AND DynamicValue(RS_7_srcpart_small_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and (key BETWEEN DynamicValue(RS_7_srcpart_small_key1_min) AND DynamicValue(RS_7_srcpart_small_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 129 Data size: 1032 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=40) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=40) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date +PREHOOK: Input: default@srcpart_date@ds=2008-04-08 +PREHOOK: Input: default@srcpart_date@ds=2008-04-09 +PREHOOK: Input: default@srcpart_small +PREHOOK: Input: default@srcpart_small@ds=2008-04-08 +PREHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Input: default@srcpart_date@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_date@ds=2008-04-09 +POSTHOOK: Input: default@srcpart_small +POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 +POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 +#### A masked pattern was here #### +176 PREHOOK: query: explain select * from alltypesorc_int join (select srcpart_date.key as key from srcpart_date union all