diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 404c4d3..9868da4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -227,7 +227,7 @@ public final class HiveFileFormatUtils { String type = conf.getCompressType(); if (type != null && !type.trim().equals("")) { CompressionType style = CompressionType.valueOf(type); - SequenceFileOutputFormat.setOutputCompressionType(jc, style); + SequenceFileOutputFormat.setOutputCompressionType(jc_output, style); } } return getRecordWriter(jc_output, hiveOutputFormat, outputClass, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 092a5cd..a2c30cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -238,11 +238,15 @@ public class GenMRFileSink1 implements NodeProcessor { TableDesc ts = (TableDesc) fsConf.getTableInfo().clone(); fsConf.getTableInfo().getProperties().remove( org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); - + boolean compressResult = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc newFSD = new FileSinkDesc(finalName, ts, parseCtx.getConf() .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)); - FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory. - getAndMakeChild(newFSD, inputRS, extract); + if (compressResult) { + newFSD.setCompressType(parseCtx.getConf().get("io.seqfile.compression.type")); + newFSD.setCompressCodec(parseCtx.getConf().get("mapred.output.compression.codec")); + } + FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory + .getAndMakeChild(newFSD, inputRS, extract); HiveConf conf = parseCtx.getConf(); MapredWork cplan = createMergeTask(conf, tsMerge, fsConf); @@ -330,8 +334,13 @@ public class GenMRFileSink1 implements NodeProcessor { // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); + boolean compressResult = parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)); + if (compressResult) { + fsOutputDesc.setCompressType(parseCtx.getConf().get("io.seqfile.compression.type")); + fsOutputDesc.setCompressCodec(parseCtx.getConf().get("mapred.output.compression.codec")); + } FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( fsOutputDesc, inputRS, tsMerge); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8fe5984..79bf10f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -3940,6 +3940,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowSchema fsRS = new RowSchema(vecCol); + boolean compressResult = conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT); FileSinkDesc fileSinkDesc = new FileSinkDesc( queryTmpdir, table_desc, @@ -3950,6 +3951,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx); + if (compressResult) { + fileSinkDesc.setCompressType(conf.get("io.seqfile.compression.type")); + fileSinkDesc.setCompressCodec(conf.get("mapred.output.compression.codec")); + } // set the stats publishing/aggregating key prefix // the same as directory name. The directory name