diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index b99fd56..269bca0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -44,13 +44,17 @@ import parquet.hadoop.ParquetInputSplit; import parquet.hadoop.api.ReadSupport.ReadContext; import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.FileMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.hadoop.util.ContextUtil; +import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; import com.google.common.base.Strings; +import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + public class ParquetRecordReaderWrapper implements RecordReader { public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class); @@ -229,17 +233,18 @@ public boolean next(final Void key, final ArrayWritable value) throws IOExceptio * @return a ParquetInputSplit corresponding to the oldSplit * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file */ - @SuppressWarnings("deprecation") protected ParquetInputSplit getSplit( final InputSplit oldSplit, final JobConf conf ) throws IOException { ParquetInputSplit split; if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); + FileSplit fileSplit = (FileSplit) oldSplit; + final Path finalPath = fileSplit.getPath(); jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath, + NO_FILTER); final List blocks = parquetMetadata.getBlocks(); final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); @@ -263,19 +268,39 @@ protected ParquetInputSplit getSplit( if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - ((FileSplit) oldSplit).getLocations(), - splitGroup, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + end(blocks, readContext.getRequestedSchema().toString()), + fileSplit.getLocations(), + offsets(blocks)); } } else { throw new IllegalArgumentException("Unknown split type: " + oldSplit); } return split; } + + private static long end(List blocks, String requestedSchema) { + MessageType requested = MessageTypeParser.parseMessageType(requestedSchema); + long length = 0; + + for (BlockMetaData block : blocks) { + List columns = block.getColumns(); + for (ColumnChunkMetaData column : columns) { + if (requested.containsPath(column.getPath().toArray())) { + length += column.getTotalSize(); + } + } + } + return length; + } + + private static long[] offsets(List blocks) { + long[] offsets = new long[blocks.size()]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = blocks.get(i).getStartingPos(); + } + return offsets; + } }