diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java index d1a75df..122f166 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.io.parquet.writable.BinaryWritable; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; @@ -37,6 +39,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; /** * This class is used as a static factory for VectorColumnAssign. @@ -185,10 +188,31 @@ protected void copyValue(BytesColumnVector src, int srcIndex, int destIndex) { public static VectorColumnAssign buildObjectAssign(VectorizedRowBatch outputBatch, int outColIndex, ObjectInspector objInspector) throws HiveException { PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector; + return buildObjectAssign(outputBatch, outColIndex, poi.getPrimitiveCategory()); + } + + public static VectorColumnAssign buildObjectAssign(VectorizedRowBatch outputBatch, + int outColIndex, PrimitiveCategory category) throws HiveException { VectorColumnAssign outVCA = null; ColumnVector destCol = outputBatch.cols[outColIndex]; - if (destCol instanceof LongColumnVector) { - switch(poi.getPrimitiveCategory()) { + if (destCol == null) { + switch(category) { + case VOID: + outVCA = new VectorLongColumnAssign() { + // This is a dummy assigner + @Override + public void assignObjectValue(Object val, int destIndex) throws HiveException { + // This is no-op, there is no column to assign to and val is expected to be null + assert (val == null); + } + }; + break; + default: + throw new HiveException("Incompatible (null) vector column and primitive category " + + category); + } + } else if (destCol instanceof LongColumnVector) { + switch(category) { case BOOLEAN: outVCA = new VectorLongColumnAssign() { @Override @@ -276,11 +300,11 @@ public void assignObjectValue(Object val, int destIndex) throws HiveException { break; default: throw new HiveException("Incompatible Long vector column and primitive category " + - poi.getPrimitiveCategory()); + category); } } else if (destCol instanceof DoubleColumnVector) { - switch(poi.getPrimitiveCategory()) { + switch(category) { case DOUBLE: outVCA = new VectorDoubleColumnAssign() { @Override @@ -311,11 +335,26 @@ public void assignObjectValue(Object val, int destIndex) throws HiveException { break; default: throw new HiveException("Incompatible Double vector column and primitive category " + - poi.getPrimitiveCategory()); + category); } } else if (destCol instanceof BytesColumnVector) { - switch(poi.getPrimitiveCategory()) { + switch(category) { + case BINARY: + outVCA = new VectorBytesColumnAssign() { + @Override + public void assignObjectValue(Object val, int destIndex) throws HiveException { + if (val == null) { + assignNull(destIndex); + } + else { + BinaryWritable bw = (BinaryWritable) val; + byte[] bytes = bw.getBytes(); + assignBytes(bytes, 0, bytes.length, destIndex); + } + } + }.init(outputBatch, (BytesColumnVector) destCol); + break; case STRING: outVCA = new VectorBytesColumnAssign() { @Override @@ -333,7 +372,7 @@ public void assignObjectValue(Object val, int destIndex) throws HiveException { break; default: throw new HiveException("Incompatible Bytes vector column and primitive category " + - poi.getPrimitiveCategory()); + category); } } else { @@ -366,4 +405,39 @@ public void assignObjectValue(Object val, int destIndex) throws HiveException { } return vcas; } + + public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch, + Writable[] writables) throws HiveException { + VectorColumnAssign[] vcas = new VectorColumnAssign[outputBatch.numCols]; + for (int i = 0; i < outputBatch.numCols; ++i) { + if (writables[i] == null) { + assert(outputBatch.cols[i] == null); + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VOID); + } else if (writables[i] instanceof ByteWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BYTE); + } else if (writables[i] instanceof ShortWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.SHORT); + } else if (writables[i] instanceof IntWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INT); + } else if (writables[i] instanceof LongWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.LONG); + } else if (writables[i] instanceof FloatWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.FLOAT); + } else if (writables[i] instanceof DoubleWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DOUBLE); + } else if (writables[i] instanceof Text) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.STRING); + } else if (writables[i] instanceof BinaryWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BINARY); + } else if (writables[i] instanceof TimestampWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.TIMESTAMP); + } else if (writables[i] instanceof BooleanWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BOOLEAN); + } else { + throw new HiveException("Unimplemented vector assigner for writable type " + + writables[i].getClass()); + } + } + return vcas; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 0b504de..6619546 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -95,6 +95,13 @@ public VectorizedRowBatch(int numCols, int size) { projectedColumns[i] = i; } } + + /** + * Returns the maximum size of the batch (number of rows it can hold) + */ + public int getMaxSize() { + return selected.length; + } /** * Return count of qualifying rows. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index f513188..298af55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -372,4 +372,17 @@ private ColumnVector allocateColumnVector(String type, int defaultSize) { } } + public VectorColumnAssign[] buildObjectAssigners(VectorizedRowBatch outputBatch) + throws HiveException { + List fieldRefs = rowOI.getAllStructFieldRefs(); + assert outputBatch.numCols == fieldRefs.size(); + VectorColumnAssign[] assigners = new VectorColumnAssign[fieldRefs.size()]; + for(int i = 0; i < assigners.length; ++i) { + StructField fieldRef = fieldRefs.get(i); + ObjectInspector fieldOI = fieldRef.getFieldObjectInspector(); + assigners[i] = VectorColumnAssignFactory.buildObjectAssign( + outputBatch, i, fieldOI); + } + return assigners; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index d3412df..51eaca8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -14,7 +14,16 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.io.ArrayWritable; @@ -29,9 +38,14 @@ * A Parquet InputFormat for Hive (with the deprecated package mapred) * */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat + implements VectorizedInputFormatInterface { + private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); + private final ParquetInputFormat realInput; + + private final transient VectorizedParquetInputFormat vectorizedSelf; public MapredParquetInputFormat() { this(new ParquetInputFormat(DataWritableReadSupport.class)); @@ -39,8 +53,10 @@ public MapredParquetInputFormat() { protected MapredParquetInputFormat(final ParquetInputFormat inputFormat) { this.realInput = inputFormat; + vectorizedSelf = new VectorizedParquetInputFormat(inputFormat); } + @SuppressWarnings("unchecked") @Override public org.apache.hadoop.mapred.RecordReader getRecordReader( final org.apache.hadoop.mapred.InputSplit split, @@ -48,7 +64,13 @@ protected MapredParquetInputFormat(final ParquetInputFormat input final org.apache.hadoop.mapred.Reporter reporter ) throws IOException { try { - return (RecordReader) new ParquetRecordReaderWrapper(realInput, split, job, reporter); + if (Utilities.isVectorMode(job)) { + return (RecordReader) vectorizedSelf.getRecordReader(split, job, reporter); + } + else { + return (RecordReader) + new ParquetRecordReaderWrapper(realInput, split, job, reporter); + } } catch (final InterruptedException e) { throw new RuntimeException("Cannot create a RecordReaderWrapper", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java new file mode 100644 index 0000000..3b9c961 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -0,0 +1,167 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import parquet.hadoop.ParquetInputFormat; + +/** + * Vectorized input format for Parquet files + */ +public class VectorizedParquetInputFormat extends FileInputFormat + implements VectorizedInputFormatInterface { + + private static final Log LOG = LogFactory.getLog(VectorizedParquetInputFormat.class); + + /** + * Vectorized record reader for vectorized Parquet input format + */ + private static class VectorizedParquetRecordReader implements + RecordReader { + private static final Log LOG = LogFactory.getLog(VectorizedParquetRecordReader.class); + + private final ParquetRecordReaderWrapper internalReader; + private VectorizedRowBatchCtx rbCtx; + private ArrayWritable internalValues; + private Void internalKey; + private VectorColumnAssign[] assigners; + + public VectorizedParquetRecordReader( + ParquetInputFormat realInput, + FileSplit split, + JobConf conf, Reporter reporter) throws IOException, InterruptedException { + internalReader = new ParquetRecordReaderWrapper( + realInput, + split, + conf, + reporter); + try { + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(conf, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + @Override + public NullWritable createKey() { + internalKey = internalReader.createKey(); + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + VectorizedRowBatch outputBatch = null; + try { + outputBatch = rbCtx.createVectorizedRowBatch(); + internalValues = internalReader.createValue(); + } catch (HiveException e) { + throw new RuntimeException("Error creating a batch", e); + } + return outputBatch; + } + + @Override + public long getPos() throws IOException { + return internalReader.getPos(); + } + + @Override + public void close() throws IOException { + internalReader.close(); + } + + @Override + public float getProgress() throws IOException { + return internalReader.getProgress(); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch outputBatch) + throws IOException { + assert(outputBatch.numCols == assigners.length); + outputBatch.reset(); + int maxSize = outputBatch.getMaxSize(); + try { + while (outputBatch.size < maxSize) { + if (false == internalReader.next(internalKey, internalValues)) { + outputBatch.endOfFile = true; + break; + } + Writable[] writables = internalValues.get(); + + if (null == assigners) { + // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet + // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and + // the writable value (IntWritable). see Parquet's ETypeConverter class. + assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, writables); + } + + for(int i = 0; i < outputBatch.numCols; ++i) { + assigners[i].assignObjectValue(writables[i], outputBatch.size); + } + ++outputBatch.size; + } + } catch (HiveException e) { + throw new RuntimeException(e); + } + return outputBatch.size > 0; + } + } + + private final ParquetInputFormat realInput; + + public VectorizedParquetInputFormat(ParquetInputFormat realInput) { + this.realInput = realInput; + } + + @SuppressWarnings("unchecked") + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf conf, Reporter reporter) throws IOException { + try { + return (RecordReader) + new VectorizedParquetRecordReader(realInput, (FileSplit) split, conf, reporter); + } catch (final InterruptedException e) { + throw new RuntimeException("Cannot create a VectorizedParquetRecordReader", e); + } + } + +}