commit 7349680ce0ce5ed29291f38cf2bbc7a8c2f73e1f Author: Janaki Lahorani Date: Mon Jul 24 14:25:52 2017 -0700 HIVE-16998: Add parameter that limits DPP to map joins Change-Id: I06c611e0690855b61a63afe140edbe2f122771cb diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index df45f2cc32fa078e773bf1b4ed2acb260176da94..329db6165795aba57d1809f12e6c3b6ccb85b159 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3406,6 +3406,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), + SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY( + "hive.spark.dynamic.partition.pruning.map.join.only", false, + "Turn on dynamic partition pruning only for map joins."), SPARK_USE_GROUPBY_SHUFFLE( "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java index 26a108830eeed02f0b35750526e18731634a2566..82facbe284f728f92c35ad12de23268ea5f42da3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java @@ -53,7 +53,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, SparkPartitionPruningSinkDesc desc = op.getConf(); if (desc.getStatistics().getDataSize() > context.getConf() - .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { + .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE) || + (context.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY) && + context.getMjOpSizes().isEmpty())) { OperatorUtils.removeBranch(op); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " diff --git ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning.q ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning.q index 3b6810bcf75a56d9590494c18d5cc4aec88af3a8..d4fda37137b7531c7c8bd6d0003143df9d73595f 100644 --- ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning.q +++ ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning.q @@ -172,6 +172,23 @@ where srcpart_date.`date` = '2008-04-08' and srcpart.hr = 13; EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); +-- test Set dpp map join only to true. DPP will be enabled only for map joins +set hive.spark.dynamic.partition.pruning.map.join.only=true; +-- the following query will use map join and dpp will be enabled +EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08'; +-- disable map join +set hive.auto.convert.join=false; +-- since map join is disabled, dpp will be disabled too +EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08'; +-- Reset to default for any tests that might get added after this +set hive.spark.dynamic.partition.pruning.map.join.only=true; +set hive.auto.convert.join=true; drop table srcpart_orc; drop table srcpart_date; diff --git ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out index d47617229206e8132081304a5748543bd538899e..db87d83117998765f0473709ac7e9f2f07bb6ed5 100644 --- ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out +++ ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out @@ -5788,6 +5788,178 @@ POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 #### A masked pattern was here #### 2008-04-08 2008-04-09 +PREHOOK: query: EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08' +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 2 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (ds is not null and (date = '2008-04-08')) (type: boolean) + Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (ds is not null and (date = '2008-04-08')) (type: boolean) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 ds (type: string) + 1 ds (type: string) + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + partition key expr: ds +#### A masked pattern was here #### + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + target column name: ds + target work: Map 1 + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 ds (type: string) + 1 ds (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col7 + input vertices: + 1 Map 2 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col2 = _col7) (type: boolean) + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col7 (type: string), '2008-04-08' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08' +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select * + from srcpart, srcpart_date + where srcpart.ds = srcpart_date.ds + and srcpart_date.date = '2008-04-08' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: ds (type: string) + sort order: + + Map-reduce partition columns: ds (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + value expressions: key (type: string), value (type: string), hr (type: string) + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: (ds is not null and (date = '2008-04-08')) (type: boolean) + Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (ds is not null and (date = '2008-04-08')) (type: boolean) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: ds (type: string) + sort order: + + Map-reduce partition columns: ds (type: string) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 ds (type: string) + 1 ds (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col7 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col2 = _col7) (type: boolean) + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col7 (type: string), '2008-04-08' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: drop table srcpart_orc PREHOOK: type: DROPTABLE POSTHOOK: query: drop table srcpart_orc