diff --git a/data/files/testParquetFile b/data/files/testParquetFile new file mode 100755 index 0000000..4662c79 Binary files /dev/null and b/data/files/testParquetFile differ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java index 6a44c27..c87f9e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java @@ -26,5 +26,8 @@ void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, int valueColu throws HiveException; void assignObjectValue(Object val, int destIndex) throws HiveException; + void assignOtherVectorValue(Object inBatch, int batchIndex, int valueColumn, int destIndex) + throws HiveException; + void reset(); } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java index 6f2f1af..b641dd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -50,6 +50,9 @@ import org.apache.hadoop.io.Text; import org.apache.hive.common.util.DateUtils; +import org.apache.parquet.vector.IntColumnVector; +import org.apache.parquet.vector.RowBatch; + /** * This class is used as a static factory for VectorColumnAssign. * Is capable of building assigners from expression nodes or from object inspectors. @@ -89,6 +92,21 @@ public void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, } } + /** + * Assign the Hive VectorizedRowBatch vectors from other similar Batch or Vector structure, + * such as, Parquet provide its Batch for Hive consuming. + * @param inBatch + * @param batchIndex + * @param valueColumnIndex + * @param destIndex + * @throws HiveException + */ + @Override + public void assignOtherVectorValue(Object inBatch, int batchIndex, + int valueColumnIndex, int destIndex) throws HiveException { + throw new HiveException("Internal error: should not reach here"); + } + public VectorColumnAssign init(VectorizedRowBatch out, T cv) { this.outBatch = out; this.outCol = cv; @@ -592,4 +610,39 @@ public void assignObjectValue(Object val, int destIndex) throws HiveException { } return vcas; } + + //TODO: Support all data types. + //TODO: Handle the mismatch of column vectors of Hive batch and Parquet batch. Current API only + // return the requested columns, but Hive allocate all columns when init batch. + /** + * Build the assigners from a RowBatch of Parquet file format. + * @param outputBatch + * @param inputBatch + * @return + * @throws HiveException + */ + public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch, + RowBatch inputBatch) throws HiveException { + VectorColumnAssign[] vcas = new VectorColumnAssign[outputBatch.numCols]; + org.apache.parquet.vector.ColumnVector[] columnVector = inputBatch.getColumns(); + for(int i = 0; i < columnVector.length; i++) { + // TODO: buildOtherVectorAssign() instead of buildObjectAssign(), it could assign an array/buffer/vector to Batch + if (columnVector[i] == null) { + assert(outputBatch.cols[i] == null); + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VOID); + } else if (columnVector[i] instanceof IntColumnVector) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INT); + } else { + throw new HiveException("Unimplemented vector assigner for parquet column vector type " + + columnVector[i].getClass()); + } + } + return vcas; + } + + //TODO: implement it. +// private static VectorColumnAssign buildOtherVectorAssign(VectorizedRowBatch outputBatch, +// int outColIndex, PrimitiveCategory category) throws HiveException { +// return null; +// } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index e1b6dd8..a9d2054 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -17,6 +17,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; @@ -34,7 +35,8 @@ * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat implements VectorizedInputFormatInterface { private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 98691c7..04fe2a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -14,15 +14,28 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; @@ -32,7 +45,22 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.ParquetVectorizedRecordReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.vector.ColumnVector; +import org.apache.parquet.vector.RowBatch; /** * Vectorized input format for Parquet files @@ -49,24 +77,44 @@ RecordReader { private static final Log LOG = LogFactory.getLog(VectorizedParquetRecordReader.class); - private final ParquetRecordReaderWrapper internalReader; - private VectorizedRowBatchCtx rbCtx; - private ObjectArrayWritable internalValues; - private NullWritable internalKey; - private VectorColumnAssign[] assigners; + private final long splitLen; // for getPos() + private int schemaSize; + private final ProjectionPusher projectionPusher; + + private ParquetVectorizedRecordReader internalReader; + private VectorizedRowBatchCtx rbCtx; + private VectorColumnAssign[] assigners; public VectorizedParquetRecordReader( - ParquetInputFormat realInput, - FileSplit split, - JobConf conf, Reporter reporter) throws IOException, InterruptedException { - internalReader = new ParquetRecordReaderWrapper( - realInput, - split, - conf, - reporter); + ParquetInputFormat newInputFormat, + FileSplit oldSplit, + JobConf oldJobConf, Reporter reporter) throws IOException, InterruptedException { + + // create internalReader, which is a Parquet vectorized reader + this.splitLen = oldSplit.getLength(); + this.projectionPusher = new ProjectionPusher(); + + final ParquetInputSplit split = getSplit(oldSplit, oldJobConf); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(oldJobConf.get(IOConstants.MAPRED_TASK_ID)); + + if (taskAttemptID == null) { + taskAttemptID = new TaskAttemptID(); + } + + setFilter(oldJobConf); + + final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(oldJobConf, taskAttemptID); + + if (split != null) { + internalReader = (ParquetVectorizedRecordReader) newInputFormat + .createVectorizedRecordReader(split, taskContext); + internalReader.initialize(split, taskContext); + } + try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); + rbCtx.init(oldJobConf, oldSplit); } catch (Exception e) { throw new RuntimeException(e); } @@ -74,7 +122,6 @@ public VectorizedParquetRecordReader( @Override public NullWritable createKey() { - internalKey = internalReader.createKey(); return NullWritable.get(); } @@ -83,7 +130,6 @@ public VectorizedRowBatch createValue() { VectorizedRowBatch outputBatch = null; try { outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); } catch (HiveException e) { throw new RuntimeException("Error creating a batch", e); } @@ -92,7 +138,7 @@ public VectorizedRowBatch createValue() { @Override public long getPos() throws IOException { - return internalReader.getPos(); + return (long) (splitLen * getProgress()); } @Override @@ -102,7 +148,15 @@ public void close() throws IOException { @Override public float getProgress() throws IOException { - return internalReader.getProgress(); + if (internalReader == null) { + return 1f; + } else { + try { + return internalReader.getProgress(); + } catch (final InterruptedException e) { + throw new IOException(e); + } + } } @Override @@ -115,29 +169,145 @@ public boolean next(NullWritable key, VectorizedRowBatch outputBatch) int maxSize = outputBatch.getMaxSize(); try { while (outputBatch.size < maxSize) { - if (false == internalReader.next(internalKey, internalValues)) { - outputBatch.endOfFile = true; - break; + // TODO: hasNext() method is still unsupported in Parquet vectorized API. +// if (!internalReader.hasNext()) { +// outputBatch.endOfFile = true; +// break; +// } + + //FIXME: Delete later. Here we use getProgress instead of hasNext temporarily. + try { + if (internalReader.getProgress() == 1.0) { + break; + } + } catch (Exception e) { + } - Object[] values = internalValues.get(); - if (null == assigners) { - // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet - // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and - // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, values); + RowBatch parquetBatch = new RowBatch(); + parquetBatch = internalReader.nextBatch(parquetBatch); + + //TODO: the batch size should be configured to equal Hive batch size by Hive. + //TODO: Hive VectorizedRowBatch has all the columns, but Parquet RowBatch has only the requested columns. + //assert (outputBatch.numCols == parquetBatch.getColumns().length); + + assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, parquetBatch); + + ColumnVector[] columnVectors = parquetBatch.getColumns(); + int parquetBatchLength = columnVectors[0].size(); + + //FIXME: parquetBatchLength should not be larger than 1024. Below handling is temporary. this is a bug in Parquet API. + if (parquetBatchLength >= maxSize) { + parquetBatchLength = maxSize; } - for(int i = 0; i < values.length; ++i) { - assigners[i].assignObjectValue(values[i], outputBatch.size); + //assign Parquet vector to Hive vector + //TODO: Set the attributes of Hive vector during assignment, like isNull, isRepeating... + for(int i = 0; i < columnVectors.length; ++i) { + ByteBuffer byteBuffers = columnVectors[i].decode(); + for (int j = 0; j < parquetBatchLength; j++) { + //TODO: Delete byteBuffers.getInt(), it is not generic + //TODO: use the new method VectorColumnAssignFactory.assignOtherVectorValue() + assigners[i].assignObjectValue(byteBuffers.getInt(), outputBatch.size + j); + } } - ++outputBatch.size; + + outputBatch.size += parquetBatchLength; } } catch (HiveException e) { throw new RuntimeException(e); } return outputBatch.size > 0; } + + public void setFilter(final JobConf conf) { + String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() || + columnNamesString.isEmpty()) { + return; + } + + FilterPredicate p = + SearchArgumentFactory.create(Utilities.deserializeExpression(serializedPushdown)) + .toFilterPredicate(); + if (p != null) { + LOG.debug("Predicate filter for parquet is " + p.toString()); + ParquetInputFormat.setFilterPredicate(conf, p); + } else { + LOG.debug("No predicate filter can be generated for " + TableScanDesc.FILTER_EXPR_CONF_STR + + " with the value of " + serializedPushdown); + } + } + + /** + * gets a ParquetInputSplit corresponding to a split given by Hive + * + * @param oldSplit The split given by Hive + * @param conf The JobConf of the Hive job + * @return a ParquetInputSplit corresponding to the oldSplit + * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file + */ + protected ParquetInputSplit getSplit( + final InputSplit oldSplit, + final JobConf conf + ) throws IOException { + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + final JobConf cloneJob = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath); + final List blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + DataWritableReadSupport readSupport = new DataWritableReadSupport(); + final ReadSupport.ReadContext readContext = readSupport.init(new InitContext(cloneJob, + null, fileMetaData.getSchema())); + schemaSize = readSupport.getTableSchema().getFieldCount(); + final List splitGroup = new ArrayList(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); + } + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); + split = null; + } else { + populateReadMetadata(readContext.getReadSupportMetadata(), fileMetaData, conf); + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + splitGroup, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + } + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); + } + return split; + } + + /** + * Method populates the read metadata, using filemetadata and Hive configuration. + * @param metadata read metadata to populate + * @param fileMetaData parquet file metadata + * @param conf hive configuration + */ + private void populateReadMetadata(Map metadata, FileMetaData fileMetaData, JobConf conf) { + metadata.put("createdBy", fileMetaData.getCreatedBy()); + metadata.put(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname, + String.valueOf(HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION))); + } + } private final ParquetInputFormat realInput; diff --git a/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q new file mode 100644 index 0000000..8580af4 --- /dev/null +++ b/ql/src/test/queries/clientpositive/vectorized_parquet_data_types.q @@ -0,0 +1,13 @@ + +create table if not exists alltypes_parquet ( + int32_field int) stored as parquet; + +LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet; + +SET hive.vectorized.execution.enabled=true; + +explain select count(int32_field) + from alltypes_parquet; + +select count(int32_field) + from alltypes_parquet; diff --git a/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out new file mode 100644 index 0000000..35f7a2f --- /dev/null +++ b/ql/src/test/results/clientpositive/vectorized_parquet_data_types.q.out @@ -0,0 +1,80 @@ +PREHOOK: query: create table if not exists alltypes_parquet ( + int32_field int) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@alltypes_parquet +POSTHOOK: query: create table if not exists alltypes_parquet ( + int32_field int) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@alltypes_parquet +PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@alltypes_parquet +POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/testParquetFile" INTO TABLE alltypes_parquet +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@alltypes_parquet +PREHOOK: query: explain select count(int32_field) + from alltypes_parquet +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(int32_field) + from alltypes_parquet +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: alltypes_parquet + Statistics: Num rows: 103542 Data size: 414169 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: int32_field (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 103542 Data size: 414169 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(int32_field) + from alltypes_parquet +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypes_parquet +#### A masked pattern was here #### +POSTHOOK: query: select count(int32_field) + from alltypes_parquet +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypes_parquet +#### A masked pattern was here #### +99612