diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 30c91ea..ea4109d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -40,6 +41,8 @@ import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.hadoop.util.ContextUtil; /** * @@ -110,15 +113,19 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws } DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); - return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress); + + return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), + progress,tableProperties); } protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( ParquetOutputFormat realOutputFormat, JobConf jobConf, String finalOutPath, - Progressable progress + Progressable progress, + Properties tableProperties ) throws IOException { - return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress); + return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), + progress,tableProperties); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index 07b003a..765b5ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.write; import java.io.IOException; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +30,7 @@ import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.util.ContextUtil; public class ParquetRecordWriterWrapper implements RecordWriter, @@ -43,7 +45,8 @@ public ParquetRecordWriterWrapper( final OutputFormat realOutputFormat, final JobConf jobConf, final String name, - final Progressable progress) throws IOException { + final Progressable progress, Properties tableProperties) throws + IOException { try { // create a TaskInputOutputContext TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id")); @@ -53,7 +56,21 @@ public ParquetRecordWriterWrapper( taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); LOG.info("creating real writer to write at " + name); - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); + + String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); + if (compressionName != null && !compressionName.isEmpty()) { + //get override compression properties via "tblproperties" clause if it is set + LOG.debug("get override compression properties via tblproperties"); + + ContextUtil.getConfiguration(taskContext); + CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); + realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf, + new Path(name), codecName); + } else { + realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, + new Path(name)); + } + LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { throw new IOException(e); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java index 417676d..667d367 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java @@ -73,7 +73,8 @@ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( ParquetOutputFormat realOutputFormat, JobConf jobConf, String finalOutPath, - Progressable progress + Progressable progress, + Properties tableProperties ) throws IOException { assertEquals(outputFormat, realOutputFormat); assertNotNull(jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA)); @@ -87,4 +88,17 @@ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( assertEquals("passed tests", e.getMessage()); } } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidCompressionTableProperties() throws IOException { + Properties tableProps = new Properties(); + tableProps.setProperty("parquet.compression", "unsupported"); + tableProps.setProperty("columns", "foo,bar"); + tableProps.setProperty("columns.types", "int:int"); + + JobConf jobConf = new JobConf(); + + new MapredParquetOutputFormat().getHiveRecordWriter(jobConf, + new Path("/foo"), null, false, tableProps, null); + } }