diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 148c952..846296e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.MapRedTask; import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -152,13 +153,40 @@ public class GenMRFileSink1 implements NodeProcessor { // There are separate configuration parameters to control whether to // merge for a map-only job // or for a map-reduce job - MapredWork currWork = (MapredWork) currTask.getWork(); + + // check whether the FileSinkOperator we are processing is in the + // reducer part - this will influence how we honor the merge + // configuration parameters + boolean isInReducer = false; + if(stack != null) { + Stack opStackCopy = (Stack) stack.clone(); + if (!opStackCopy.isEmpty()) { + // pop the top element since that is just the FileSinkOperator + // we are currently processing + opStackCopy.pop(); + // look in the remaining stack for a ReduceSinkOperator before + // any other FileSinkOperator + while (!opStackCopy.isEmpty()) { + Node n = opStackCopy.pop(); + if (n instanceof ReduceSinkOperator) { + isInReducer = true; + break; + } else if (n instanceof FileSinkOperator) { + break; + } + } + } + } + // if the FileSinkOperator is not in the reduce part we can honor + // the merge map files config parameter boolean mergeMapOnly = hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) && - currWork.getReducer() == null; + !isInReducer; + // if the FileSinkOperator is in the reduce part we can honor the + // merge mapred files config parameter boolean mergeMapRed = hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) && - currWork.getReducer() != null; + isInReducer; if (mergeMapOnly || mergeMapRed) { chDir = true; } diff --git ql/src/test/queries/clientpositive/optimal_merge.q ql/src/test/queries/clientpositive/optimal_merge.q new file mode 100644 index 0000000..fa11ffc --- /dev/null +++ ql/src/test/queries/clientpositive/optimal_merge.q @@ -0,0 +1,11 @@ +create table if not exists optimal_merge_table1 like src; +create table if not exists optimal_merge_table2 like src; + +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=false; +explain from src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select *; + +drop table optimal_merge_table1; +drop table optimal_merge_table2; diff --git ql/src/test/results/clientpositive/optimal_merge.q.out ql/src/test/results/clientpositive/optimal_merge.q.out new file mode 100644 index 0000000..761b364 --- /dev/null +++ ql/src/test/results/clientpositive/optimal_merge.q.out @@ -0,0 +1,191 @@ +PREHOOK: query: create table if not exists optimal_merge_table1 like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table1 like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table1 +PREHOOK: query: create table if not exists optimal_merge_table2 like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table2 like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table2 +PREHOOK: query: explain from src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +PREHOOK: type: QUERY +POSTHOOK: query: explain from src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_GROUPBY (TOK_TABLE_OR_COL key))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table2))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-9 depends on stages: Stage-2 , consists of Stage-6, Stage-5, Stage-7 + Stage-6 + Stage-1 depends on stages: Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-1 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 + +STAGE PLANS: + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count() + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 2 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table2 + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-9 + Conditional Operator + + Stage: Stage-6 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-1 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table2 + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table2 + + Stage: Stage-7 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table2 + + Stage: Stage-8 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: drop table optimal_merge_table1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@optimal_merge_table1 +PREHOOK: Output: default@optimal_merge_table1 +POSTHOOK: query: drop table optimal_merge_table1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@optimal_merge_table1 +POSTHOOK: Output: default@optimal_merge_table1 +PREHOOK: query: drop table optimal_merge_table2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@optimal_merge_table2 +PREHOOK: Output: default@optimal_merge_table2 +POSTHOOK: query: drop table optimal_merge_table2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@optimal_merge_table2 +POSTHOOK: Output: default@optimal_merge_table2