diff --git itests/pom.xml itests/pom.xml index e5b54bf..a89d9ad 100644 --- itests/pom.xml +++ itests/pom.xml @@ -82,19 +82,6 @@ finalName=$2 tarName=$(basename $url) rm -rf $BASE_DIR/$finalName - if [[ ! -f $DOWNLOAD_DIR/$tarName ]] - then - curl -Sso $DOWNLOAD_DIR/$tarName $url - else - local md5File="$tarName".md5sum - curl -Sso $DOWNLOAD_DIR/$md5File "$url".md5sum - cd $DOWNLOAD_DIR - if ! md5sum -c $md5File; then - curl -Sso $DOWNLOAD_DIR/$tarName $url || return 1 - fi - - cd - - fi tar -zxf $DOWNLOAD_DIR/$tarName -C $BASE_DIR mv $BASE_DIR/spark-${spark.version}-bin-hadoop2-without-hive $BASE_DIR/$finalName } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 76aa39f..abad9c7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -172,7 +172,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // unlink connection between FS and its parent fsParent = fsOp.getParentOperators().get(0); - fsParent.getChildOperators().clear(); + fsParent.getChildOperators().remove(fsOp); DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx(); int numBuckets = destTable.getNumBuckets(); @@ -364,7 +364,7 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { rsChild.getSchema().getSignature().size()) { return false; } - rsParent.getChildOperators().clear(); + rsParent.getChildOperators().remove(rsToRemove); rsParent.getChildOperators().add(rsGrandChild); rsGrandChild.getParentOperators().clear(); rsGrandChild.getParentOperators().add(rsParent); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 2b075be..8ca3725 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -530,6 +530,23 @@ protected Integer checkNumReducer(int creduce, int preduce) { return 0; } + // Check that in the path between cRS and pRS, there are only Select operators + // i.e. the sequence must be pRS-SEL*-cRS + // ensure SEL does not branch + protected boolean checkSelectSingleBranchOnly(ReduceSinkOperator cRS, ReduceSinkOperator pRS) { + Operator parent = cRS.getParentOperators().get(0); + while (parent != pRS) { + assert parent.getNumParent() == 1; + if (!(parent instanceof SelectOperator)) { + return false; + } else if (parent.getChildOperators().size() > 1) { + return false; + } + parent = parent.getParentOperators().get(0); + } + return true; + } + protected boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { assert cRS.getNumParent() == 1; @@ -539,15 +556,8 @@ protected boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS List cKeys = cConf.getKeyCols(); List pKeys = pConf.getKeyCols(); - // Check that in the path between cRS and pRS, there are only Select operators - // i.e. the sequence must be pRS-SEL*-cRS - Operator parent = cRS.getParentOperators().get(0); - while (parent != pRS) { - assert parent.getNumParent() == 1; - if (!(parent instanceof SelectOperator)) { - return false; - } - parent = parent.getParentOperators().get(0); + if (!checkSelectSingleBranchOnly(cRS, pRS)) { + return false; } // If child keys are null or empty, we bail out @@ -619,7 +629,7 @@ protected boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS // Replace pRS with cRS and remove operator sequence from pRS to cRS // Recall that the sequence must be pRS-SEL*-cRS - parent = cRS.getParentOperators().get(0); + Operator parent = cRS.getParentOperators().get(0); while (parent != pRS) { dedupCtx.addRemovedOperator(parent); parent = parent.getParentOperators().get(0); @@ -743,6 +753,9 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup CorrelationUtilities.findPossibleParent( cRS, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null) { + if (!checkSelectSingleBranchOnly(cRS, pRS)) { + return false; + } // Try extended deduplication if (aggressiveDedup(cRS, pRS, dedupCtx)) { return true; diff --git ql/src/test/queries/clientpositive/dynpart_sort_optimization.q ql/src/test/queries/clientpositive/dynpart_sort_optimization.q index 5ef8ead..39adb5b 100644 --- ql/src/test/queries/clientpositive/dynpart_sort_optimization.q +++ ql/src/test/queries/clientpositive/dynpart_sort_optimization.q @@ -208,3 +208,45 @@ insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from ov insert overwrite table over1k_part3 partition(s,t,i) select si,b,f,s,t,i from over1k where i=100 and t=27 and s="foo"; select sum(hash(*)) from over1k_part3; + + +-- verify multiple branches in SDPO +set hive.optimize.sort.dynamic.partition=true; +set hive.optimize.reducededuplication.min.reducer=1; +-- CBO will remove the intermediate ORDER BY +set hive.cbo.enable=true; + +create table over1k_part4_0(i int) + partitioned by (s string); +create table over1k_part4_1(i int) + partitioned by (s string); + +EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +; + +EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +; + +select count(1) from over1k_part4_0; +select count(1) from over1k_part4_1; + diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out index 1ef0740..9be67a3 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out @@ -3269,3 +3269,349 @@ POSTHOOK: Input: default@over1k_part3@s=wendy van buren/t=27/i=65680 POSTHOOK: Input: default@over1k_part3@s=xavier quirinius/t=27/i=65599 #### A masked pattern was here #### 17814641134 +PREHOOK: query: create table over1k_part4_0(i int) + partitioned by (s string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@over1k_part4_0 +POSTHOOK: query: create table over1k_part4_0(i int) + partitioned by (s string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@over1k_part4_0 +PREHOOK: query: create table over1k_part4_1(i int) + partitioned by (s string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@over1k_part4_1 +POSTHOOK: query: create table over1k_part4_1(i int) + partitioned by (s string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@over1k_part4_1 +PREHOOK: query: EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-3 + Stage-4 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: over1k + Statistics: Num rows: 1025 Data size: 106636 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((s like 'bob%') and (i > 1)) (type: boolean) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: i (type: int), s (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: (_col0 + 1) (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Select Operator + expressions: (_col0 + 0) (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Dp Sort State: PARTITION_SORTED + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_0 + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Dp Sort State: PARTITION_SORTED + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_1 + + Stage: Stage-3 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + s + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_0 + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + partition: + s + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_1 + + Stage: Stage-5 + Stats-Aggr Operator + +PREHOOK: query: EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +WITH CTE AS ( + select i, s from over1k where s like 'bob%' +) +FROM ( + select * from CTE where i > 1 ORDER BY s +) src1k +insert overwrite table over1k_part4_0 partition(s) +select i+1, s +insert overwrite table over1k_part4_1 partition(s) +select i+0, s +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-3 + Stage-4 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: over1k + Statistics: Num rows: 1025 Data size: 106636 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((s like 'bob%') and (i > 1)) (type: boolean) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: i (type: int), s (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: (_col0 + 1) (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Select Operator + expressions: (_col0 + 0) (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Dp Sort State: PARTITION_SORTED + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_0 + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), KEY._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Dp Sort State: PARTITION_SORTED + Statistics: Num rows: 170 Data size: 17685 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_1 + + Stage: Stage-3 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + s + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_0 + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + partition: + s + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part4_1 + + Stage: Stage-5 + Stats-Aggr Operator + +PREHOOK: query: select count(1) from over1k_part4_0 +PREHOOK: type: QUERY +PREHOOK: Input: default@over1k_part4_0 +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from over1k_part4_0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@over1k_part4_0 +#### A masked pattern was here #### +0 +PREHOOK: query: select count(1) from over1k_part4_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@over1k_part4_1 +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from over1k_part4_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@over1k_part4_1 +#### A masked pattern was here #### +0