diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 404c4d3..9868da4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -227,7 +227,7 @@ public final class HiveFileFormatUtils { String type = conf.getCompressType(); if (type != null && !type.trim().equals("")) { CompressionType style = CompressionType.valueOf(type); - SequenceFileOutputFormat.setOutputCompressionType(jc, style); + SequenceFileOutputFormat.setOutputCompressionType(jc_output, style); } } return getRecordWriter(jc_output, hiveOutputFormat, outputClass, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 092a5cd..a2c30cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -238,11 +238,15 @@ public class GenMRFileSink1 implements NodeProcessor { TableDesc ts = (TableDesc) fsConf.getTableInfo().clone(); fsConf.getTableInfo().getProperties().remove( org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); - + boolean compressResult = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc newFSD = new FileSinkDesc(finalName, ts, parseCtx.getConf() .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)); - FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory. - getAndMakeChild(newFSD, inputRS, extract); + if (compressResult) { + newFSD.setCompressType(parseCtx.getConf().get("io.seqfile.compression.type")); + newFSD.setCompressCodec(parseCtx.getConf().get("mapred.output.compression.codec")); + } + FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory + .getAndMakeChild(newFSD, inputRS, extract); HiveConf conf = parseCtx.getConf(); MapredWork cplan = createMergeTask(conf, tsMerge, fsConf); @@ -330,8 +334,13 @@ public class GenMRFileSink1 implements NodeProcessor { // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); + boolean compressResult = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)); + if (compressResult) { + fsOutputDesc.setCompressType(parseCtx.getConf().get("io.seqfile.compression.type")); + fsOutputDesc.setCompressCodec(parseCtx.getConf().get("mapred.output.compression.codec")); + } FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( fsOutputDesc, inputRS, tsMerge); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cb7a542..e504cc0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -4008,6 +4008,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowSchema fsRS = new RowSchema(vecCol); + boolean compressResult = conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc fileSinkDesc = new FileSinkDesc( queryTmpdir, table_desc, @@ -4018,6 +4019,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx); + if (compressResult) { + fileSinkDesc.setCompressType(conf.get("io.seqfile.compression.type")); + fileSinkDesc.setCompressCodec(conf.get("mapred.output.compression.codec")); + } // set the stats publishing/aggregating key prefix // the same as directory name. The directory name diff --git ql/src/test/queries/clientpositive/compress.q ql/src/test/queries/clientpositive/compress.q new file mode 100644 index 0000000..b3bd05a --- /dev/null +++ ql/src/test/queries/clientpositive/compress.q @@ -0,0 +1,20 @@ + +CREATE TABLE raw_sequence (key STRING, value STRING) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE; + +SET hive.exec.compress.output=true; +SET io.seqfile.compression.type=BLOCK; + +EXPLAIN INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src limit 10; + +INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src; + +CREATE TABLE raw_sequence2 (key STRING, value STRING) PARTITIONED BY (ds string) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE; + +EXPLAIN INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src; + +INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src limit 10; + diff --git ql/src/test/results/clientpositive/compress.q.out ql/src/test/results/clientpositive/compress.q.out new file mode 100644 index 0000000..97fe091 --- /dev/null +++ ql/src/test/results/clientpositive/compress.q.out @@ -0,0 +1,181 @@ +PREHOOK: query: CREATE TABLE raw_sequence (key STRING, value STRING) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE raw_sequence (key STRING, value STRING) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@raw_sequence +PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src limit 10 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME raw_sequence))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 10))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Limit + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + Limit + File Output Operator + compressed: true + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.raw_sequence + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.raw_sequence + + Stage: Stage-2 + Stats-Aggr Operator + + +PREHOOK: query: INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@raw_sequence +POSTHOOK: query: INSERT OVERWRITE TABLE raw_sequence SELECT * FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@raw_sequence +POSTHOOK: Lineage: raw_sequence.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: CREATE TABLE raw_sequence2 (key STRING, value STRING) PARTITIONED BY (ds string) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE raw_sequence2 (key STRING, value STRING) PARTITIONED BY (ds string) + ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' + STORED AS SEQUENCEFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@raw_sequence2 +POSTHOOK: Lineage: raw_sequence.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src +POSTHOOK: type: QUERY +POSTHOOK: Lineage: raw_sequence.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME raw_sequence2) (TOK_PARTSPEC (TOK_PARTVAL ds '101')))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 + Stage-4 + Stage-0 depends on stages: Stage-4, Stage-3 + Stage-2 depends on stages: Stage-0 + Stage-3 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: true + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.raw_sequence2 + + Stage: Stage-5 + Conditional Operator + + Stage: Stage-4 + Move Operator + files: + hdfs directory: true + destination: pfile:/home/vaggarw/repos/git_hive/hive05/hive-trunk/hive-trunk/build/ql/scratchdir/hive_2011-08-23_12-06-13_124_665279259237422022/-ext-10000 + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 101 + replace: true + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.raw_sequence2 + + Stage: Stage-2 + Stats-Aggr Operator + + Stage: Stage-3 + Map Reduce + Alias -> Map Operator Tree: + pfile:/home/vaggarw/repos/git_hive/hive05/hive-trunk/hive-trunk/build/ql/scratchdir/hive_2011-08-23_12-06-13_124_665279259237422022/-ext-10002 + File Output Operator + compressed: true + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.raw_sequence2 + + +PREHOOK: query: INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@raw_sequence2@ds=101 +POSTHOOK: query: INSERT OVERWRITE TABLE raw_sequence2 PARTITION(ds='101') SELECT * FROM src limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@raw_sequence2@ds=101 +POSTHOOK: Lineage: raw_sequence.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence2 PARTITION(ds=101).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: raw_sequence2 PARTITION(ds=101).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]