diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 5ab3076..23ca17f 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1370,6 +1370,7 @@ spark.query.files=add_part_multiple.q, \ spark.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ + multi_insert_parallel_orderby.q,\ spark_explainuser_1.q,\ spark_vectorized_dynamic_partition_pruning.q,\ spark_use_file_size_for_mapjoin.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 337f418..dfb4f79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -22,6 +22,8 @@ import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ObjectPair; @@ -202,20 +204,33 @@ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveC } if (desc.getNumReducers() == 1 && desc.hasOrderBy() && hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY) && !desc.isDeduplicated()) { + Stack> descendants = new Stack>(); List> children = reduceSink.getChildOperators(); - while (children != null && children.size() > 0) { - if (children.size() != 1 || children.get(0) instanceof LimitOperator) { + if (children != null) { + for (Operator child : children) { + descendants.push(child); + } + } + while (descendants.size() != 0) { + Operator descendant = descendants.pop(); + //If the decendants contains LimitOperator,return false + if (descendant instanceof LimitOperator) { return false; } - if (children.get(0) instanceof ReduceSinkOperator || - children.get(0) instanceof FileSinkOperator) { - break; + boolean reachTerminalOperator = (descendant instanceof TerminalOperator); + if (!reachTerminalOperator) { + List> childrenOfDescendant = descendant.getChildOperators(); + if (childrenOfDescendant != null) { + for (Operator childOfDescendant : childrenOfDescendant) { + descendants.push(childOfDescendant); + } + } } - children = children.get(0).getChildOperators(); } return true; } return false; + } private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException { diff --git a/ql/src/test/queries/clientpositive/multi_insert_parallel_orderby.q b/ql/src/test/queries/clientpositive/multi_insert_parallel_orderby.q new file mode 100644 index 0000000..9963050 --- /dev/null +++ b/ql/src/test/queries/clientpositive/multi_insert_parallel_orderby.q @@ -0,0 +1,34 @@ +set hive.mapred.mode=nonstrict; +set hive.exec.reducers.bytes.per.reducer=256; +set hive.optimize.sampling.orderby=true; + +-- SORT_QUERY_RESULTS + +create table e1 (key string, value string); +create table e2 (key string); + +--test orderby+limit case +explain +select key,value from src order by key limit 10; +select key,value from src order by key limit 10; + + +--test orderby+limit+multi_insert case +explain +FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key; + +FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key; + +select * from e1; +select * from e2; + +drop table e1; +drop table e2; diff --git a/ql/src/test/results/clientpositive/spark/multi_insert_parallel_orderby.q.out b/ql/src/test/results/clientpositive/spark/multi_insert_parallel_orderby.q.out new file mode 100644 index 0000000..5472aca --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/multi_insert_parallel_orderby.q.out @@ -0,0 +1,261 @@ +PREHOOK: query: create table e1 (key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@e1 +POSTHOOK: query: create table e1 (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@e1 +PREHOOK: query: create table e2 (key string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@e2 +POSTHOOK: query: create table e2 (key string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@e2 +PREHOOK: query: explain +select key,value from src order by key limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key,value from src order by key limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select key,value from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key,value from src order by key limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +100 val_100 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +PREHOOK: query: explain +FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key +PREHOOK: type: QUERY +POSTHOOK: query: explain +FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-2 + Stage-4 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark + Edges: + Reducer 2 <- Map 1 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.e1 + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.e2 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.e1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.e2 + + Stage: Stage-4 + Stats-Aggr Operator + +PREHOOK: query: FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@e1 +PREHOOK: Output: default@e2 +POSTHOOK: query: FROM (select key,value from src order by key limit 10) a +INSERT OVERWRITE TABLE e1 + SELECT key, value +INSERT OVERWRITE TABLE e2 + SELECT key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@e1 +POSTHOOK: Output: default@e2 +POSTHOOK: Lineage: e1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: e1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: e2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: select * from e1 +PREHOOK: type: QUERY +PREHOOK: Input: default@e1 +#### A masked pattern was here #### +POSTHOOK: query: select * from e1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@e1 +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +100 val_100 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +PREHOOK: query: select * from e2 +PREHOOK: type: QUERY +PREHOOK: Input: default@e2 +#### A masked pattern was here #### +POSTHOOK: query: select * from e2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@e2 +#### A masked pattern was here #### +0 +0 +0 +10 +100 +100 +103 +103 +104 +104 +PREHOOK: query: drop table e1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@e1 +PREHOOK: Output: default@e1 +POSTHOOK: query: drop table e1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@e1 +POSTHOOK: Output: default@e1 +PREHOOK: query: drop table e2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@e2 +PREHOOK: Output: default@e2 +POSTHOOK: query: drop table e2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@e2 +POSTHOOK: Output: default@e2