diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 148c952..e523baf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -153,12 +153,38 @@ public class GenMRFileSink1 implements NodeProcessor { // merge for a map-only job // or for a map-reduce job MapredWork currWork = (MapredWork) currTask.getWork(); + Operator currReducer = currWork.getReducer(); + + boolean isInReducer = false; + if(currReducer != null) { + // check whether the FileSinkOperator we are processing is in the + // reducer part - this will influence how we honor the merge + // configuration parameters + if(stack != null) { + // traversing the stack in a read-only LIFO fashion ignoring + // the top of the stack since that is just the FileSinkOperator + // we are currently processing to check if we can find the + // currReducer in it before any other FileSinkOperator + for(int i = stack.size() - 2; i >= 0; i--) { + if (stack.get(i) == currReducer) { + isInReducer = true; + break; + } else if (stack.get(i) instanceof FileSinkOperator) { + break; + } + } + } + } + // if the FileSinkOperator is not in the reduce part we can honor + // the merge map files config parameter boolean mergeMapOnly = hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) && - currWork.getReducer() == null; + !isInReducer; + // if the FileSinkOperator is in the reduce part we can honor the + // merge mapred files config parameter boolean mergeMapRed = hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) && - currWork.getReducer() != null; + isInReducer; if (mergeMapOnly || mergeMapRed) { chDir = true; } diff --git ql/src/test/queries/clientpositive/optimal_merge.q ql/src/test/queries/clientpositive/optimal_merge.q new file mode 100644 index 0000000..d08e854 --- /dev/null +++ ql/src/test/queries/clientpositive/optimal_merge.q @@ -0,0 +1,28 @@ +-- drop any tables remaining from previous run +drop table optimal_merge_table_src; +drop table optimal_merge_table1; +drop table optimal_merge_table2; + +create table if not exists optimal_merge_table_src (key string, value string); +create table if not exists optimal_merge_table1 like optimal_merge_table_src; +create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE; + +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=false; + +load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src; +load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src; + +explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select *; + +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select *; + +-- the output should show that the table only has 1 file +show table extended like optimal_merge_table2; + diff --git ql/src/test/results/clientpositive/optimal_merge.q.out ql/src/test/results/clientpositive/optimal_merge.q.out new file mode 100644 index 0000000..dbc1f8b --- /dev/null +++ ql/src/test/results/clientpositive/optimal_merge.q.out @@ -0,0 +1,229 @@ +PREHOOK: query: -- drop any tables remaining from previous run +drop table optimal_merge_table_src +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- drop any tables remaining from previous run +drop table optimal_merge_table_src +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table optimal_merge_table1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table optimal_merge_table1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table optimal_merge_table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table optimal_merge_table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table if not exists optimal_merge_table_src (key string, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table_src (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: create table if not exists optimal_merge_table1 like optimal_merge_table_src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table1 like optimal_merge_table_src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table1 +PREHOOK: query: create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table2 +PREHOOK: query: load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src +PREHOOK: type: LOAD +PREHOOK: Output: default@optimal_merge_table_src +POSTHOOK: query: load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src +POSTHOOK: type: LOAD +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src +PREHOOK: type: LOAD +PREHOOK: Output: default@optimal_merge_table_src +POSTHOOK: query: load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src +POSTHOOK: type: LOAD +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +PREHOOK: type: QUERY +POSTHOOK: query: explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME optimal_merge_table_src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_GROUPBY (TOK_TABLE_OR_COL key))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table2))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-9 depends on stages: Stage-2 , consists of Stage-6, Stage-5, Stage-7 + Stage-6 + Stage-1 depends on stages: Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-1 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 + +STAGE PLANS: + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + optimal_merge_table_src + TableScan + alias: optimal_merge_table_src + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count() + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 2 + table: + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat + serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe + name: default.optimal_merge_table2 + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-9 + Conditional Operator + + Stage: Stage-6 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-1 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat + serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe + name: default.optimal_merge_table2 + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-5 + Block level merge + + Stage: Stage-7 + Block level merge + + Stage: Stage-8 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +PREHOOK: type: QUERY +PREHOOK: Input: default@optimal_merge_table_src +PREHOOK: Output: default@optimal_merge_table1 +PREHOOK: Output: default@optimal_merge_table2 +POSTHOOK: query: from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +POSTHOOK: type: QUERY +POSTHOOK: Input: default@optimal_merge_table_src +POSTHOOK: Output: default@optimal_merge_table1 +POSTHOOK: Output: default@optimal_merge_table2 +POSTHOOK: Lineage: optimal_merge_table1.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table1.value EXPRESSION [(optimal_merge_table_src)optimal_merge_table_src.null, ] +POSTHOOK: Lineage: optimal_merge_table2.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table2.value SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: -- the output should show that the table only has 1 file +show table extended like optimal_merge_table2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: -- the output should show that the table only has 1 file +show table extended like optimal_merge_table2 +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: optimal_merge_table1.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table1.value EXPRESSION [(optimal_merge_table_src)optimal_merge_table_src.null, ] +POSTHOOK: Lineage: optimal_merge_table2.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table2.value SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:value, type:string, comment:null), ] +tableName:optimal_merge_table2 +#### A masked pattern was here #### +inputformat:org.apache.hadoop.hive.ql.io.RCFileInputFormat +outputformat:org.apache.hadoop.hive.ql.io.RCFileOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:138 +maxFileSize:138 +minFileSize:138 +#### A masked pattern was here #### + diff --git ql/src/test/results/clientpositive/union19.q.out ql/src/test/results/clientpositive/union19.q.out index e3f33cb..5d21005 100644 --- ql/src/test/results/clientpositive/union19.q.out +++ ql/src/test/results/clientpositive/union19.q.out @@ -34,8 +34,13 @@ STAGE DEPENDENCIES: Stage-3 depends on stages: Stage-2 Stage-0 depends on stages: Stage-3 Stage-4 depends on stages: Stage-0 - Stage-1 depends on stages: Stage-3 + Stage-10 depends on stages: Stage-3 , consists of Stage-7, Stage-6, Stage-8 + Stage-7 + Stage-1 depends on stages: Stage-7, Stage-6, Stage-9 Stage-5 depends on stages: Stage-1 + Stage-6 + Stage-8 + Stage-9 depends on stages: Stage-8 STAGE PLANS: Stage: Stage-2 @@ -224,6 +229,15 @@ STAGE PLANS: Stage: Stage-4 Stats-Aggr Operator + Stage: Stage-10 + Conditional Operator + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-1 Move Operator tables: @@ -237,6 +251,38 @@ STAGE PLANS: Stage: Stage-5 Stats-Aggr Operator + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest2 + + Stage: Stage-8 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest2 + + Stage: Stage-9 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 UNION ALL