diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6631a6e45d..6664cefcdb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3014,6 +3014,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Only perform semijoin optimization if the estimated benefit at or above this fraction of the target table"), TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_MAPJOIN("hive.tez.dynamic.semijoin.reduction.for.mapjoin", false, "Use a semi-join branch for map-joins. This may not make it faster, but is helpful in certain join patterns."), + TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_DPP_FACTOR("hive.tez.dynamic.semijoin.reduction.for.dpp.factor", (float) 1.0, + "The factor to control if semijoin branch feeding into a TS which has DPP based on nDVs"), TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index da30c3b642..bb4acbec6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1414,7 +1414,12 @@ private void markSemiJoinForDPP(OptimizeTezProcContext procCtx) ExprNodeColumnDesc tsColExpr = ExprNodeDescUtils.getColumnExpr(tsExpr); long nDVsOfTS = filStats.getColumnStatisticsFromColName( tsColExpr.getColumn()).getCountDistint(); - if (nDVsOfTS >= nDVs) { + float nDVsFactored = nDVs * procCtx.conf.getFloatVar( + ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_FOR_DPP_FACTOR); + if (nDVsOfTS > nDVsFactored) { + if (LOG.isDebugEnabled()) { + LOG.debug("nDVs = " + nDVs + ", nDVsFactored = " + nDVsFactored + " and nDVsOfTS = " + nDVsOfTS); + } sjInfo.setShouldRemove(false); } } diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q index 6cc0a7f7a9..65450d922f 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q @@ -124,6 +124,13 @@ set hive.tez.dynamic.semijoin.reduction=true; EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds; select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds; +-- HIVE-17936 +set hive.tez.dynamic.semijoin.reduction.for.dpp.factor = 1.5; +EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds; +-- semijoin branch should be removed. +set hive.tez.dynamic.semijoin.reduction.for.dpp.factor = 2.5; +EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds; + -- With unions explain select * from alltypesorc_int join (select srcpart_date.key as key from srcpart_date diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out index 1a1a4d9b2d..67fe41e223 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction.q.out @@ -3411,6 +3411,302 @@ POSTHOOK: Input: default@srcpart_small@ds=2008-04-08 POSTHOOK: Input: default@srcpart_small@ds=2008-04-09 #### A masked pattern was here #### 10000 +PREHOOK: query: EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE), Reducer 3 (BROADCAST_EDGE) + Map 4 <- Map 1 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: (key1 is not null and (key1 BETWEEN DynamicValue(RS_10_srcpart_small10_key1_min) AND DynamicValue(RS_10_srcpart_small10_key1_max) and in_bloom_filter(key1, DynamicValue(RS_10_srcpart_small10_key1_bloom_filter)))) (type: boolean) + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: ((key1 BETWEEN DynamicValue(RS_10_srcpart_small10_key1_min) AND DynamicValue(RS_10_srcpart_small10_key1_max) and in_bloom_filter(key1, DynamicValue(RS_10_srcpart_small10_key1_bloom_filter))) and key1 is not null) (type: boolean) + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string), ds (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: _col1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: PARTIAL + Dynamic Partitioning Event Operator + Target column: ds (string) + Target Input: srcpart_date + Partition key expr: ds + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: PARTIAL + Target Vertex: Map 4 + Execution mode: llap + LLAP IO: all inputs + Map 2 + Map Operator Tree: + TableScan + alias: srcpart_small10 + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=10) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 720000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col0 (type: string) + input vertices: + 0 Map 1 + Statistics: Num rows: 10000 Data size: 80000 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=10) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select count(*) from srcpart_small10, srcpart_small, srcpart_date where srcpart_small.key1 = srcpart_small10.key1 and srcpart_date.ds = srcpart_small.ds +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) + Map 3 <- Map 1 (BROADCAST_EDGE) + Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_small + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: key1 (type: string), ds (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 20 Data size: 5420 Basic stats: COMPLETE Column stats: PARTIAL + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: _col1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 1840 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: PARTIAL + Dynamic Partitioning Event Operator + Target column: ds (string) + Target Input: srcpart_date + Partition key expr: ds + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: PARTIAL + Target Vertex: Map 3 + Execution mode: llap + LLAP IO: all inputs + Map 2 + Map Operator Tree: + TableScan + alias: srcpart_small10 + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 10 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 720000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 368000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col0 (type: string) + input vertices: + 0 Map 1 + Statistics: Num rows: 10000 Data size: 80000 Basic stats: COMPLETE Column stats: PARTIAL + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: explain select * from alltypesorc_int join (select srcpart_date.key as key from srcpart_date union all