diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index cca1055..ddbf39c 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1504,6 +1504,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning_2.q,\ spark_dynamic_partition_pruning_3.q,\ spark_dynamic_partition_pruning_4.q,\ + spark_dynamic_partition_pruning_5.q,\ spark_dynamic_partition_pruning_mapjoin_only.q,\ spark_constprog_dpp.q,\ spark_dynamic_partition_pruning_recursive_mapjoin.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ca19fd0..5b23402 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -21,8 +21,12 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; +import java.util.Set; +import java.util.function.Function; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -211,6 +215,34 @@ public static void collectOp(Collection> result, Operator root, C } /** + * Collect operators of type T starting from root. Matching operators will be put into result. + * Set seen can be used to skip search in certain branches. + * Function stop can be used to stop the search under certain conditions. + */ + public static > void collectOp(Operator root, Class cls, + Collection result, Set> seen, Function, Boolean> stopAt) { + if (seen.contains(root) || stopAt.apply(root)) { + return; + } + Deque> deque = new ArrayDeque<>(); + deque.add(root); + while (!deque.isEmpty()) { + Operator op = deque.remove(); + seen.add(op); + if (cls.isInstance(op)) { + result.add((T) op); + } + if (op.getChildOperators() != null) { + for (Operator child : op.getChildOperators()) { + if (!seen.contains(child) && !stopAt.apply(child)) { + deque.add(child); + } + } + } + } + } + + /** * remove currTask from the children of its parentTask * remove currTask from the parent of its childrenTask * @param currTask diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d..3b5c221 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -34,12 +34,14 @@ import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -132,6 +134,9 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, // Remove cyclic dependencies for DPP runCycleAnalysisForPartitionPruning(procCtx); + // Remove nested DPPs + removeNestedDPP(procCtx); + // Re-run constant propagation so we fold any new constants introduced by the operator optimizers // Specifically necessary for DPP because we might have created lots of "and true and true" conditions if (procCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { @@ -612,4 +617,85 @@ protected void optimizeTaskPlan(List> rootTasks, Pa PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE); return; } + + /** The following is an example of nested DPP: + * + * TS TS + * | | + * ... FIL + * | | \ + * RS RS SEL + * \ / | + * TS JOIN GBY + * | / \ | + * RS RS SEL DPP2 + * \ / | + * JOIN GBY + * | + * DPP1 + * + * where DPP1 depends on DPP2. + * + * To avoid such case, we'll visit all the branching operators. If a branching operator has any + * further away DPP branches in its sub-tree, such branches will be removed. + * In the above example, the branch of DPP1 will be removed. + */ + private void removeNestedDPP(OptimizeSparkProcContext procContext) { + if (!conf.isSparkDPPAny()) { + return; + } + Set allDPPs = new HashSet<>(); + Set> seen = new HashSet<>(); + // collect all DPP sinks + for (TableScanOperator root : procContext.getParseContext().getTopOps().values()) { + SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, allDPPs, + seen, o -> false); + } + // collect all branching operators + Set> branchingOps = new HashSet<>(); + for (SparkPartitionPruningSinkOperator dpp : allDPPs) { + branchingOps.add(dpp.getBranchingOp()); + } + // remember the branching ops we have visited + Set> visited = new HashSet<>(); + for (Operator branchingOp : branchingOps) { + if (!visited.contains(branchingOp)) { + visited.add(branchingOp); + seen.clear(); + Set nestedDPPs = new HashSet<>(); + for (Operator branch : branchingOp.getChildOperators()) { + if (!isDirectDPPBranch(branch)) { + SparkUtilities.collectOp(branch, SparkPartitionPruningSinkOperator.class, nestedDPPs, + seen, this::stopAtMJ); + } + } + for (SparkPartitionPruningSinkOperator nestedDPP : nestedDPPs) { + visited.add(nestedDPP.getBranchingOp()); + OperatorUtils.removeBranch(nestedDPP); + } + } + } + } + + // whether of pattern "SEL - GBY - DPP" + private boolean isDirectDPPBranch(Operator op) { + if (op instanceof SelectOperator && op.getChildOperators() != null + && op.getChildOperators().size() == 1) { + op = op.getChildOperators().get(0); + if (op instanceof GroupByOperator && op.getChildOperators() != null + && op.getChildOperators().size() == 1) { + op = op.getChildOperators().get(0); + return op instanceof SparkPartitionPruningSinkOperator; + } + } + return false; + } + + // If a branch is of pattern "RS - MAPJOIN", it means we're on the "small table" side of a + // map join. Since there will be a job boundary, we shouldn't look for DPPs beyond this. + private boolean stopAtMJ(Operator op) { + return op instanceof ReduceSinkOperator && op.getChildOperators() != null + && op.getChildOperators().size() == 1 && op.getChildOperators() + .get(0) instanceof MapJoinOperator; + } } diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q new file mode 100644 index 0000000..3c1e4f5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_5.q @@ -0,0 +1,14 @@ +set hive.spark.dynamic.partition.pruning=true; + +-- This qfile tests whether we can handle nested DPP sinks + +create table part1(key string, value string) partitioned by (p string, q string); +insert into table part1 partition (p='1', q='1') values ('1','1'), ('2','2'); + +create table part2(key string, value string) partitioned by (p string, q string); +insert into table part2 partition (p='3', q='3') values ('a','a'), ('b','b'); + +explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.q; + +drop table part1; +drop table part2; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out new file mode 100644 index 0000000..9f99d6d --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_5.q.out @@ -0,0 +1,184 @@ +PREHOOK: query: create table part1(key string, value string) partitioned by (p string, q string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part1 +POSTHOOK: query: create table part1(key string, value string) partitioned by (p string, q string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part1 +PREHOOK: query: insert into table part1 partition (p='1', q='1') values ('1','1'), ('2','2') +PREHOOK: type: QUERY +PREHOOK: Output: default@part1@p=1/q=1 +POSTHOOK: query: insert into table part1 partition (p='1', q='1') values ('1','1'), ('2','2') +POSTHOOK: type: QUERY +POSTHOOK: Output: default@part1@p=1/q=1 +POSTHOOK: Lineage: part1 PARTITION(p=1,q=1).key SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: part1 PARTITION(p=1,q=1).value SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: create table part2(key string, value string) partitioned by (p string, q string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part2 +POSTHOOK: query: create table part2(key string, value string) partitioned by (p string, q string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part2 +PREHOOK: query: insert into table part2 partition (p='3', q='3') values ('a','a'), ('b','b') +PREHOOK: type: QUERY +PREHOOK: Output: default@part2@p=3/q=3 +POSTHOOK: query: insert into table part2 partition (p='3', q='3') values ('a','a'), ('b','b') +POSTHOOK: type: QUERY +POSTHOOK: Output: default@part2@p=3/q=3 +POSTHOOK: Lineage: part2 PARTITION(p=3,q=3).key SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: part2 PARTITION(p=3,q=3).value SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.q +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from src join part1 on src.key=part1.p join part2 on src.value=part2.q +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [4:p (string)] + partition key expr: [p] + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + target works: [Map 4] + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4) + Reducer 3 <- Map 5 (PARTITION-LEVEL SORT, 4), Reducer 2 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Map 4 + Map Operator Tree: + TableScan + alias: part1 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string), q (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: string) + sort order: + + Map-reduce partition columns: _col2 (type: string) + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col3 (type: string) + Map 5 + Map Operator Tree: + TableScan + alias: part2 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), p (type: string), q (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col2 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string) + Reducer 3 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: drop table part1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@part1 +PREHOOK: Output: default@part1 +POSTHOOK: query: drop table part1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@part1 +POSTHOOK: Output: default@part1 +PREHOOK: query: drop table part2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@part2 +PREHOOK: Output: default@part2 +POSTHOOK: query: drop table part2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@part2 +POSTHOOK: Output: default@part2