diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index 8b02b42..c57dd99 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -64,6 +64,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.ParquetWriter; import parquet.io.api.Binary; /** @@ -79,8 +80,9 @@ public static final Text MAP = new Text("map"); public static final Text ARRAY = new Text("bag"); - // default compression type is uncompressed - private static final String DEFAULTCOMPRESSION = "UNCOMPRESSED"; + // default compression type for parquet output format + private static final String DEFAULTCOMPRESSION = + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name(); // Map precision to the number bytes needed for binary conversion. public static final int PRECISION_TO_BYTE_COUNT[] = new int[38]; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index 765b5ac..e52c4bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -18,10 +18,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -55,21 +57,13 @@ public ParquetRecordWriterWrapper( } taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); + LOG.info("initialize serde with table properties."); + initializeSerProperties(taskContext, tableProperties); + LOG.info("creating real writer to write at " + name); - String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); - if (compressionName != null && !compressionName.isEmpty()) { - //get override compression properties via "tblproperties" clause if it is set - LOG.debug("get override compression properties via tblproperties"); - - ContextUtil.getConfiguration(taskContext); - CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf, - new Path(name), codecName); - } else { - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, - new Path(name)); - } + realWriter = + ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { @@ -77,6 +71,31 @@ public ParquetRecordWriterWrapper( } } + private void initializeSerProperties(JobContext job, Properties tableProperties) { + String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE); + Configuration conf = ContextUtil.getConfiguration(job); + if (blockSize != null && !blockSize.isEmpty()) { + LOG.debug("get override parquet.block.size property via tblproperties"); + conf.setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize)); + } + + String enableDictionaryPage = + tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY); + if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) { + LOG.debug("get override parquet.enable.dictionary property via tblproperties"); + conf.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, + Boolean.valueOf(enableDictionaryPage)); + } + + String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); + if (compressionName != null && !compressionName.isEmpty()) { + //get override compression properties via "tblproperties" clause if it is set + LOG.debug("get override compression properties via tblproperties"); + CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); + conf.set(ParquetOutputFormat.COMPRESSION, codecName.name()); + } + } + @Override public void close(final Reporter reporter) throws IOException { try { diff --git ql/src/test/queries/clientpositive/create_like.q ql/src/test/queries/clientpositive/create_like.q index 7271306..69e47ab 100644 --- ql/src/test/queries/clientpositive/create_like.q +++ ql/src/test/queries/clientpositive/create_like.q @@ -65,8 +65,8 @@ DESCRIBE FORMATTED doctors; CREATE TABLE doctors2 like doctors; DESCRIBE FORMATTED doctors2; -CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO"); -CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable; +CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO"); +CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable; -DESCRIBE FORMATTED LikeCompressedParquetTable; +DESCRIBE FORMATTED LikePropertiedParquetTable; diff --git ql/src/test/results/clientpositive/create_like.q.out ql/src/test/results/clientpositive/create_like.q.out index 0c82cea..b9e2783 100644 --- ql/src/test/results/clientpositive/create_like.q.out +++ ql/src/test/results/clientpositive/create_like.q.out @@ -405,28 +405,28 @@ Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1 -PREHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") +PREHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@CompressedParquetTable -POSTHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") +PREHOOK: Output: default@PropertiedParquetTable +POSTHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@CompressedParquetTable -PREHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable +POSTHOOK: Output: default@PropertiedParquetTable +PREHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@LikeCompressedParquetTable -POSTHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable +PREHOOK: Output: default@LikePropertiedParquetTable +POSTHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@LikeCompressedParquetTable -PREHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable +POSTHOOK: Output: default@LikePropertiedParquetTable +PREHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable PREHOOK: type: DESCTABLE -PREHOOK: Input: default@likecompressedparquettable -POSTHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable +PREHOOK: Input: default@likepropertiedparquettable +POSTHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@likecompressedparquettable +POSTHOOK: Input: default@likepropertiedparquettable # col_name data_type comment a int