diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index 8b02b42..0d07751 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -64,6 +64,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.ParquetWriter; import parquet.io.api.Binary; /** @@ -72,15 +73,17 @@ * */ @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES, - ParquetOutputFormat.COMPRESSION}) + ParquetOutputFormat.COMPRESSION, ParquetOutputFormat.BLOCK_SIZE, + ParquetOutputFormat.ENABLE_DICTIONARY}) public class ParquetHiveSerDe extends AbstractSerDe { public static final Text MAP_KEY = new Text("key"); public static final Text MAP_VALUE = new Text("value"); public static final Text MAP = new Text("map"); public static final Text ARRAY = new Text("bag"); - // default compression type is uncompressed - private static final String DEFAULTCOMPRESSION = "UNCOMPRESSED"; + // default compression type for parquet output format + private static final String DEFAULTCOMPRESSION = + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name(); // Map precision to the number bytes needed for binary conversion. public static final int PRECISION_TO_BYTE_COUNT[] = new int[38]; @@ -105,6 +108,9 @@ private long serializedSize; private long deserializedSize; private String compressionType; + private int blockSize; + private boolean enableDictionary; + @Override public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException { @@ -118,6 +124,10 @@ public final void initialize(final Configuration conf, final Properties tbl) thr // Get compression properties compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION); + blockSize = Integer.valueOf(tbl.getProperty(ParquetOutputFormat.BLOCK_SIZE, + String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE))); + enableDictionary = Boolean.valueOf(tbl.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY, + String.valueOf(DEFAULTCOMPRESSION))); if (columnNameProperty.length() == 0) { columnNames = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index 765b5ac..cd4d5d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -22,6 +22,7 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -55,21 +56,13 @@ public ParquetRecordWriterWrapper( } taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); + LOG.info("initialize serde with table properties."); + initializeSerProperties(taskContext, tableProperties); + LOG.info("creating real writer to write at " + name); - String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); - if (compressionName != null && !compressionName.isEmpty()) { - //get override compression properties via "tblproperties" clause if it is set - LOG.debug("get override compression properties via tblproperties"); - - ContextUtil.getConfiguration(taskContext); - CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf, - new Path(name), codecName); - } else { - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, - new Path(name)); - } + realWriter = + ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { @@ -77,6 +70,31 @@ public ParquetRecordWriterWrapper( } } + private void initializeSerProperties(JobContext job, Properties tableProperties) { + String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE); + if (blockSize != null && !blockSize.isEmpty()) { + LOG.debug("get override parquet.block.size property via tblproperties"); + ContextUtil.getConfiguration(job) + .setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize)); + } + + String enableDictionaryPage = + tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY); + if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) { + LOG.debug("get override parquet.enable.dictionary property via tblproperties"); + ContextUtil.getConfiguration(job).setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, + Boolean.valueOf(enableDictionaryPage)); + } + + String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); + if (compressionName != null && !compressionName.isEmpty()) { + //get override compression properties via "tblproperties" clause if it is set + LOG.debug("get override compression properties via tblproperties"); + CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); + ContextUtil.getConfiguration(job).set(ParquetOutputFormat.COMPRESSION, codecName.name()); + } + } + @Override public void close(final Reporter reporter) throws IOException { try { diff --git ql/src/test/queries/clientpositive/create_like.q ql/src/test/queries/clientpositive/create_like.q index 7271306..ad15e42 100644 --- ql/src/test/queries/clientpositive/create_like.q +++ ql/src/test/queries/clientpositive/create_like.q @@ -65,8 +65,15 @@ DESCRIBE FORMATTED doctors; CREATE TABLE doctors2 like doctors; DESCRIBE FORMATTED doctors2; -CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO"); -CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable; +CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET + TBLPROPERTIES("parquet.compression"="LZO","parquet.enable.dictionary"="true", + "parquet.enable.dictionary"="true","parquet.block.size"="12345"); +CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable; -DESCRIBE FORMATTED LikeCompressedParquetTable; +DESCRIBE FORMATTED LikePropertiedParquetTable; + +CREATE TABLE NonePropertiedParquetTable(a INT, b STRING) STORED AS PARQUET; +CREATE TABLE LikeNonePropertiedParquetTable LIKE NonePropertiedParquetTable; + +DESCRIBE FORMATTED LikeNonePropertiedParquetTable; diff --git ql/src/test/results/clientpositive/create_like.q.out ql/src/test/results/clientpositive/create_like.q.out index 0c82cea..99ceb68 100644 --- ql/src/test/results/clientpositive/create_like.q.out +++ ql/src/test/results/clientpositive/create_like.q.out @@ -405,28 +405,32 @@ Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1 -PREHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") +PREHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET + TBLPROPERTIES("parquet.compression"="LZO","parquet.enable.dictionary"="true", + "parquet.enable.dictionary"="true","parquet.block.size"="12345") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@CompressedParquetTable -POSTHOOK: query: CREATE TABLE CompressedParquetTable(a INT, b STRING) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="LZO") +PREHOOK: Output: default@PropertiedParquetTable +POSTHOOK: query: CREATE TABLE PropertiedParquetTable(a INT, b STRING) STORED AS PARQUET + TBLPROPERTIES("parquet.compression"="LZO","parquet.enable.dictionary"="true", + "parquet.enable.dictionary"="true","parquet.block.size"="12345") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@CompressedParquetTable -PREHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable +POSTHOOK: Output: default@PropertiedParquetTable +PREHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@LikeCompressedParquetTable -POSTHOOK: query: CREATE TABLE LikeCompressedParquetTable LIKE CompressedParquetTable +PREHOOK: Output: default@LikePropertiedParquetTable +POSTHOOK: query: CREATE TABLE LikePropertiedParquetTable LIKE PropertiedParquetTable POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@LikeCompressedParquetTable -PREHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable +POSTHOOK: Output: default@LikePropertiedParquetTable +PREHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable PREHOOK: type: DESCTABLE -PREHOOK: Input: default@likecompressedparquettable -POSTHOOK: query: DESCRIBE FORMATTED LikeCompressedParquetTable +PREHOOK: Input: default@likepropertiedparquettable +POSTHOOK: query: DESCRIBE FORMATTED LikePropertiedParquetTable POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@likecompressedparquettable +POSTHOOK: Input: default@likepropertiedparquettable # col_name data_type comment a int @@ -440,7 +444,56 @@ Retention: 0 #### A masked pattern was here #### Table Type: MANAGED_TABLE Table Parameters: + parquet.block.size 12345 parquet.compression LZO + parquet.enable.dictionary true +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: CREATE TABLE NonePropertiedParquetTable(a INT, b STRING) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@NonePropertiedParquetTable +POSTHOOK: query: CREATE TABLE NonePropertiedParquetTable(a INT, b STRING) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@NonePropertiedParquetTable +PREHOOK: query: CREATE TABLE LikeNonePropertiedParquetTable LIKE NonePropertiedParquetTable +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@LikeNonePropertiedParquetTable +POSTHOOK: query: CREATE TABLE LikeNonePropertiedParquetTable LIKE NonePropertiedParquetTable +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@LikeNonePropertiedParquetTable +PREHOOK: query: DESCRIBE FORMATTED LikeNonePropertiedParquetTable +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@likenonepropertiedparquettable +POSTHOOK: query: DESCRIBE FORMATTED LikeNonePropertiedParquetTable +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@likenonepropertiedparquettable +# col_name data_type comment + +a int +b string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: #### A masked pattern was here #### # Storage Information