diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 76d18ed72d..e0d73d407d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1507,6 +1507,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning_2.q,\ spark_dynamic_partition_pruning_3.q,\ spark_dynamic_partition_pruning_4.q,\ + spark_dynamic_partition_pruning_5.q,\ spark_dynamic_partition_pruning_mapjoin_only.q,\ spark_constprog_dpp.q,\ spark_dynamic_partition_pruning_recursive_mapjoin.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ca19fd013c..07a5e45700 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -21,8 +21,11 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; +import java.util.Set; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -210,6 +213,33 @@ public static void collectOp(Collection> result, Operator root, C } } + /** + * Collect operators of type T starting from root. Matching operators will be put into result. + * Set seen can be used to skip search in certain branches. + */ + public static > void collectOp(Operator root, Class cls, + Collection result, Set> seen) { + if (seen.contains(root)) { + return; + } + Deque> deque = new ArrayDeque<>(); + deque.add(root); + while (!deque.isEmpty()) { + Operator op = deque.remove(); + seen.add(op); + if (cls.isInstance(op)) { + result.add((T) op); + } + if (op.getChildOperators() != null) { + for (Operator child : op.getChildOperators()) { + if (!seen.contains(child)) { + deque.add(child); + } + } + } + } + } + /** * remove currTask from the children of its parentTask * remove currTask from the parent of its childrenTask diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d925..151c6fb8ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -34,12 +34,14 @@ import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -132,6 +134,9 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, // Remove cyclic dependencies for DPP runCycleAnalysisForPartitionPruning(procCtx); + // Remove nested DPPs + removeNestedDPP(procCtx); + // Re-run constant propagation so we fold any new constants introduced by the operator optimizers // Specifically necessary for DPP because we might have created lots of "and true and true" conditions if (procCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { @@ -612,4 +617,82 @@ protected void optimizeTaskPlan(List> rootTasks, Pa PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE); return; } + + /** For DPP sinks w/ common join, we'll split the tree and what's above the branching + * operator is computed multiple times. Therefore it may not be good for performance to support + * nested DPP sinks, i.e. one DPP sink depends on other DPP sinks. + * The following is an example: + * + * TS TS + * | | + * ... FIL + * | | \ + * RS RS SEL + * \ / | + * TS JOIN GBY + * | / \ | + * RS RS SEL DPP2 + * \ / | + * JOIN GBY + * | + * DPP1 + * + * where DPP1 depends on DPP2. + * + * To avoid such case, we'll visit all the branching operators. If a branching operator has any + * further away DPP branches w/ common join in its sub-tree, such branches will be removed. + * In the above example, the branch of DPP1 will be removed. + */ + private void removeNestedDPP(OptimizeSparkProcContext procContext) { + if (!conf.isSparkDPPAny()) { + return; + } + Set allDPPs = new HashSet<>(); + Set> seen = new HashSet<>(); + // collect all DPP sinks + for (TableScanOperator root : procContext.getParseContext().getTopOps().values()) { + SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, allDPPs, seen); + } + // collect all branching operators + Set> branchingOps = new HashSet<>(); + for (SparkPartitionPruningSinkOperator dpp : allDPPs) { + branchingOps.add(dpp.getBranchingOp()); + } + // remember the branching ops we have visited + Set> visited = new HashSet<>(); + for (Operator branchingOp : branchingOps) { + if (!visited.contains(branchingOp)) { + visited.add(branchingOp); + seen.clear(); + Set nestedDPPs = new HashSet<>(); + for (Operator branch : branchingOp.getChildOperators()) { + if (!isDirectDPPBranch(branch)) { + SparkUtilities.collectOp(branch, SparkPartitionPruningSinkOperator.class, nestedDPPs, + seen); + } + } + for (SparkPartitionPruningSinkOperator nestedDPP : nestedDPPs) { + visited.add(nestedDPP.getBranchingOp()); + // if a DPP is with MJ, the tree won't be split and so we don't have to remove it + if (!nestedDPP.isWithMapjoin()) { + OperatorUtils.removeBranch(nestedDPP); + } + } + } + } + } + + // whether of pattern "SEL - GBY - DPP" + private boolean isDirectDPPBranch(Operator op) { + if (op instanceof SelectOperator && op.getChildOperators() != null + && op.getChildOperators().size() == 1) { + op = op.getChildOperators().get(0); + if (op instanceof GroupByOperator && op.getChildOperators() != null + && op.getChildOperators().size() == 1) { + op = op.getChildOperators().get(0); + return op instanceof SparkPartitionPruningSinkOperator; + } + } + return false; + } } diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q new file mode 100644 index 0000000000..488378776e --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q @@ -0,0 +1,24 @@ +set hive.spark.dynamic.partition.pruning=true; + +-- This qfile tests whether we can handle nested DPP sinks + +create table part1(key string, value string) partitioned by (p string); +insert into table part1 partition (p='1') select * from src; + +create table part2(key string, value string) partitioned by (p string); +insert into table part2 partition (p='1') select * from src; + +create table regular1 as select * from src limit 2; + +-- nested DPP is removed, upper most DPP is w/ common join +explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p; + +-- nested DPP is removed, upper most DPP is w/ map join +set hive.auto.convert.join=true; +-- ensure regular1 is treated as small table, and partitioned tables are not +set hive.auto.convert.join.noconditionaltask.size=20; +explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p; + +drop table part1; +drop table part2; +drop table regular1; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out new file mode 100644 index 0000000000..189a43bd15 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out @@ -0,0 +1,335 @@ +PREHOOK: query: create table part1(key string, value string) partitioned by (p string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part1 +POSTHOOK: query: create table part1(key string, value string) partitioned by (p string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part1 +PREHOOK: query: insert into table part1 partition (p='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@part1@p=1 +POSTHOOK: query: insert into table part1 partition (p='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@part1@p=1 +POSTHOOK: Lineage: part1 PARTITION(p=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: part1 PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table part2(key string, value string) partitioned by (p string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part2 +POSTHOOK: query: create table part2(key string, value string) partitioned by (p string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part2 +PREHOOK: query: insert into table part2 partition (p='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@part2@p=1 +POSTHOOK: query: insert into table part2 partition (p='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@part2@p=1 +POSTHOOK: Lineage: part2 PARTITION(p=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: part2 PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table regular1 as select * from src limit 2 +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@regular1 +POSTHOOK: query: create table regular1 as select * from src limit 2 +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@regular1 +POSTHOOK: Lineage: regular1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: regular1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.p +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [1:p (string)] + partition key expr: [p] + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + target works: [Map 1] + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4) + Reducer 3 <- Map 5 (PARTITION-LEVEL SORT, 4), Reducer 2 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: part1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: string) + sort order: + + Map-reduce partition columns: _col2 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Map 4 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Map 5 + Map Operator Tree: + TableScan + alias: part2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: string) + sort order: + + Map-reduce partition columns: _col2 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col4 (type: string) + sort order: + + Map-reduce partition columns: _col4 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + Reducer 3 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col4 (type: string) + 1 _col2 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col3 (type: string), _col4 (type: string), _col0 (type: string), _col1 (type: string), _col2 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from regular1 join part1 on regular1.key=part1.p join part2 on regular1.value=part2.p +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: regular1 + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [1:p (string)] + partition key expr: [p] + Statistics: Num rows: 2 Data size: 20 Basic stats: COMPLETE Column stats: NONE + target works: [Map 1] + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: part1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + input vertices: + 1 Map 3 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col4 (type: string) + sort order: + + Map-reduce partition columns: _col4 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + Local Work: + Map Reduce Local Work + Map 4 + Map Operator Tree: + TableScan + alias: part2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: string) + sort order: + + Map-reduce partition columns: _col2 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col4 (type: string) + 1 _col2 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col3 (type: string), _col4 (type: string), _col0 (type: string), _col1 (type: string), _col2 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: drop table part1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@part1 +PREHOOK: Output: default@part1 +POSTHOOK: query: drop table part1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@part1 +POSTHOOK: Output: default@part1 +PREHOOK: query: drop table part2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@part2 +PREHOOK: Output: default@part2 +POSTHOOK: query: drop table part2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@part2 +POSTHOOK: Output: default@part2 +PREHOOK: query: drop table regular1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@regular1 +PREHOOK: Output: default@regular1 +POSTHOOK: query: drop table regular1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@regular1 +POSTHOOK: Output: default@regular1