diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index e0018a5bc0..0534a285eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -69,30 +69,15 @@ } } - private SerDeStats stats; private ObjectInspector objInspector; - - private enum LAST_OPERATION { - SERIALIZE, - DESERIALIZE, - UNKNOWN - } - - private LAST_OPERATION status; - private long serializedSize; - private long deserializedSize; - private ParquetHiveRecord parquetRow; public ParquetHiveSerDe() { parquetRow = new ParquetHiveRecord(); - stats = new SerDeStats(); } @Override public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException { - - final TypeInfo rowTypeInfo; final List columnNames; final List columnTypes; // Get column names and sort order @@ -128,19 +113,11 @@ public final void initialize(final Configuration conf, final Properties tbl) thr } } this.objInspector = new ArrayWritableObjectInspector(completeTypeInfo, prunedTypeInfo); - - // Stats part - serializedSize = 0; - deserializedSize = 0; - status = LAST_OPERATION.UNKNOWN; } @Override public Object deserialize(final Writable blob) throws SerDeException { - status = LAST_OPERATION.DESERIALIZE; - deserializedSize = 0; if (blob instanceof ArrayWritable) { - deserializedSize = ((ArrayWritable) blob).get().length; return blob; } else { return null; @@ -163,23 +140,21 @@ public Writable serialize(final Object obj, final ObjectInspector objInspector) if (!objInspector.getCategory().equals(Category.STRUCT)) { throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct"); } - serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size(); - status = LAST_OPERATION.SERIALIZE; + parquetRow.value = obj; parquetRow.inspector= (StructObjectInspector)objInspector; return parquetRow; } + /** + * Return null for Parquet format and stats is collected in ParquetRecordWriterWrapper when writer gets + * closed + * + * @return null + */ @Override public SerDeStats getSerDeStats() { - // must be different - assert (status != LAST_OPERATION.UNKNOWN); - if (status == LAST_OPERATION.SERIALIZE) { - stats.setRawDataSize(serializedSize); - } else { - stats.setRawDataSize(deserializedSize); - } - return stats; + return null; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index db8a33247f..eee21cd9b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -32,21 +33,25 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.util.Progressable; - +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.hadoop.util.HadoopInputFile; -public class ParquetRecordWriterWrapper implements RecordWriter, - org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { +public class ParquetRecordWriterWrapper implements RecordWriter, StatsProvidingRecordWriter, org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordWriterWrapper.class); private final org.apache.hadoop.mapreduce.RecordWriter realWriter; private final TaskAttemptContext taskContext; - + private final JobConf jobConf; + private final Path file; + private SerDeStats stats; public ParquetRecordWriterWrapper( final OutputFormat realOutputFormat, final JobConf jobConf, @@ -66,8 +71,12 @@ public ParquetRecordWriterWrapper( LOG.info("creating real writer to write at " + name); + this.jobConf = jobConf; + this.file = new Path(name); + realWriter = - ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); + ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, this.file); + LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { @@ -128,6 +137,21 @@ public void close(final Reporter reporter) throws IOException { } catch (final InterruptedException e) { throw new IOException(e); } + + // Collect file stats + try { + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(this.file, this.jobConf)); + long totalSize = 0; + for (BlockMetaData block : reader.getFooter().getBlocks()) { + totalSize += block.getTotalByteSize(); + } + + stats = new SerDeStats(); + stats.setRowCount(reader.getRecordCount()); + stats.setRawDataSize(totalSize); + } catch(IOException e) { + // Ignore + } } @Override @@ -149,4 +173,9 @@ public void write(final Writable w) throws IOException { write(null, (ParquetHiveRecord) w); } + @Override + public SerDeStats getStats() { + return stats; + } + } diff --git a/ql/src/test/queries/clientpositive/parquet_stats.q b/ql/src/test/queries/clientpositive/parquet_stats.q new file mode 100644 index 0000000000..92eaadb293 --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_stats.q @@ -0,0 +1,12 @@ + +DROP TABLE if exists parquet_stats; + +CREATE TABLE parquet_stats ( + id int, + str string +) STORED AS PARQUET; + +SET hive.stats.autogather=true; +INSERT INTO parquet_stats values(0, 'this is string 0'), (1, 'string 1'); +DESC FORMATTED parquet_stats; + diff --git a/ql/src/test/results/clientpositive/parquet_stats.q.out b/ql/src/test/results/clientpositive/parquet_stats.q.out new file mode 100644 index 0000000000..3bc6554bfd --- /dev/null +++ b/ql/src/test/results/clientpositive/parquet_stats.q.out @@ -0,0 +1,63 @@ +PREHOOK: query: DROP TABLE if exists parquet_stats +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE if exists parquet_stats +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE parquet_stats ( + id int, + str string +) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_stats +POSTHOOK: query: CREATE TABLE parquet_stats ( + id int, + str string +) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_stats +PREHOOK: query: INSERT INTO parquet_stats values(0, 'this is string 0'), (1, 'string 1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@parquet_stats +POSTHOOK: query: INSERT INTO parquet_stats values(0, 'this is string 0'), (1, 'string 1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@parquet_stats +POSTHOOK: Lineage: parquet_stats.id SCRIPT [] +POSTHOOK: Lineage: parquet_stats.str SCRIPT [] +PREHOOK: query: DESC FORMATTED parquet_stats +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@parquet_stats +POSTHOOK: query: DESC FORMATTED parquet_stats +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@parquet_stats +# col_name data_type comment +id int +str string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"str\":\"true\"}} + bucketing_version 2 + numFiles 1 + numRows 2 + rawDataSize 146 + totalSize 431 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1