diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java new file mode 100644 index 0000000..0e19e59 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +public class VectorizedBatchUtil { + + /** + * Sets the IsNull value for ColumnVector at specified index + * @param cv + * @param rowIndex + */ + public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { + cv.isNull[rowIndex] = true; + if (cv.noNulls) { + cv.noNulls = false; + } + } + + /** + * Iterates thru all the column vectors and sets noNull to + * specified value. + * + * @param valueToSet + * noNull value to set + * @param batch + * Batch on which noNull is set + */ + public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { + for (int i = 0; i < batch.numCols; i++) { + batch.cols[i].noNulls = true; + } + } + + /** + * Iterates thru all the columns in a given row and populates the batch + * @param row Deserialized row object + * @param oi Object insepector for that row + * @param rowIndex index to which the row should be added to batch + * @param batch Vectorized batch to which the row is added at rowIndex + * @throws HiveException + */ + public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex, + VectorizedRowBatch batch) throws HiveException { + List fieldRefs = oi.getAllStructFieldRefs(); + // Iterate thru the cols and load the batch + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); + + // Vectorization only supports PRIMITIVE data types. Assert the same + assert (foi.getCategory() == Category.PRIMITIVE); + + // Get writable object + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; + Object writableCol = poi.getPrimitiveWritableObject(fieldData); + + // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for + // float/double. String types have no default value for null. + switch (poi.getPrimitiveCategory()) { + case SHORT: { + LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + if (writableCol != null) { + lcv.vector[rowIndex] = ((ShortWritable) writableCol).get(); + lcv.isNull[rowIndex] = false; + } else { + lcv.vector[rowIndex] = 1; + SetNullColIsNullValue(lcv, rowIndex); + } + } + break; + case INT: { + LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + if (writableCol != null) { + lcv.vector[rowIndex] = ((IntWritable) writableCol).get(); + lcv.isNull[rowIndex] = false; + } else { + lcv.vector[rowIndex] = 1; + SetNullColIsNullValue(lcv, rowIndex); + } + } + break; + case LONG: { + LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + if (writableCol != null) { + lcv.vector[rowIndex] = ((LongWritable) writableCol).get(); + lcv.isNull[rowIndex] = false; + } else { + lcv.vector[rowIndex] = 1; + SetNullColIsNullValue(lcv, rowIndex); + } + } + break; + case FLOAT: { + DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; + if (writableCol != null) { + dcv.vector[rowIndex] = ((FloatWritable) writableCol).get(); + dcv.isNull[rowIndex] = false; + } else { + dcv.vector[rowIndex] = Double.NaN; + SetNullColIsNullValue(dcv, rowIndex); + } + } + break; + case DOUBLE: { + DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; + if (writableCol != null) { + dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get(); + dcv.isNull[rowIndex] = false; + } else { + dcv.vector[rowIndex] = Double.NaN; + SetNullColIsNullValue(dcv, rowIndex); + } + } + break; + case STRING: { + BytesColumnVector bcv = (BytesColumnVector) batch.cols[i]; + if (writableCol != null) { + bcv.isNull[rowIndex] = false; + Text colText = (Text) writableCol; + bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength()); + } else { + SetNullColIsNullValue(bcv, rowIndex); + } + } + break; + default: + throw new HiveException("Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java new file mode 100644 index 0000000..676b86a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyLong; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * VectorizedColumnarSerDe is used by Vectorized query execution engine + * for columnar based storage supported by RCFile. + */ +public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde { + + public VectorizedColumnarSerDe() throws SerDeException { + } + + private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE]; + private final ObjectWritable ow = new ObjectWritable(); + private final ByteStream.Output serializeVectorStream = new ByteStream.Output(); + + /** + * Serialize a vectorized row batch + * + * @param obj + * Vectorized row batch to serialize + * @param objInspector + * The ObjectInspector for the row object + * @return The serialized Writable object + * @throws SerDeException + * @see SerDe#serialize(Object, ObjectInspector) + */ + @Override + public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) + throws SerDeException { + try { + // Validate that the OI is of struct type + if (objInspector.getCategory() != Category.STRUCT) { + throw new UnsupportedOperationException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + VectorizedRowBatch batch = (VectorizedRowBatch) vrg; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + + // Reset the byte buffer + serializeVectorStream.reset(); + int count = 0; + int rowIndex = 0; + for (int i = 0; i < batch.size; i++) { + + // If selectedInUse is true then we need to serialize only + // the selected indexes + if (batch.selectedInUse) { + rowIndex = batch.selected[i]; + } else { + rowIndex = i; + } + + BytesRefArrayWritable byteRow = byteRefArray[i]; + int numCols = fields.size(); + + if (byteRow == null) { + byteRow = new BytesRefArrayWritable(numCols); + byteRefArray[i] = byteRow; + } + + byteRow.resetValid(numCols); + + for (int k = 0; k < numCols; k++) { + ObjectInspector foi = fields.get(k).getFieldObjectInspector(); + ColumnVector currentColVector = batch.cols[k]; + + switch (foi.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; + if (!currentColVector.noNulls + && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) { + // The column is null hence write null value + serializeVectorStream.write(new byte[0], 0, 0); + } else { + // If here then the vector value is not null. + if (currentColVector.isRepeating) { + // If the vector has repeating values then set rowindex to zero + rowIndex = 0; + } + + switch (poi.getPrimitiveCategory()) { + case SHORT: + case INT: + case LONG: + LongColumnVector lcv = (LongColumnVector) batch.cols[k]; + LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]); + break; + case FLOAT: + case DOUBLE: + DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k]; + ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex])); + serializeVectorStream.write(b.array(), 0, b.limit()); + break; + case STRING: + BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; + LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex], + bcv.start[rowIndex], + bcv.length[rowIndex], + serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams + .getNeedsEscape()); + break; + default: + throw new UnsupportedOperationException( + "Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + } + break; + } + case LIST: + case MAP: + case STRUCT: + case UNION: + throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:" + + foi.getCategory()); + default: + throw new SerDeException("Unknown ObjectInspector category!"); + + } + + byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream + .getCount() - count); + count = serializeVectorStream.getCount(); + } + + } + ow.set(byteRefArray); + } catch (Exception e) { + throw new SerDeException(e); + } + return ow; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + + @Override + public Class getSerializedClass() { + return BytesRefArrayWritable.class; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + + // Ideally this should throw UnsupportedOperationException as the serde is + // vectorized serde. But since RC file reader does not support vectorized reading this + // is left as it is. This function will be called from VectorizedRowBatchCtx::AddRowToBatch + // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized + // reading this serde and be standalone serde with no dependency on ColumnarSerDe. + return super.deserialize(blob); + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + throw new UnsupportedOperationException(); + } + + /** + * Deserializes the rowBlob into Vectorized row batch + * @param rowBlob + * rowBlob row batch to deserialize + * @param rowsInBlob + * Total number of rows in rowBlob to deserialize + * @param reuseBatch + * VectorizedRowBatch to which the rows should be serialized * + * @throws SerDeException + */ + @Override + public void deserializeVector(Object rowBlob, int rowsInBlob, + VectorizedRowBatch reuseBatch) throws SerDeException { + + BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob; + for (int i = 0; i < rowsInBlob; i++) { + Object row = deserialize(refArray[i]); + try { + VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch); + } catch (HiveException e) { + throw new SerDeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 46256c4..1a57766 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -33,19 +33,12 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; @@ -239,6 +232,9 @@ public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException return result; } + + + /** * Adds the row to the batch after deserializing the row * @@ -254,122 +250,25 @@ public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) throws HiveException, SerDeException { - List fieldRefs = rawRowOI.getAllStructFieldRefs(); Object row = this.deserializer.deserialize(rowBlob); - // Iterate thru the cols and load the batch - for (int i = 0; i < fieldRefs.size(); i++) { - Object fieldData = rawRowOI.getStructFieldData(row, fieldRefs.get(i)); - ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); - - // Vectorization only supports PRIMITIVE data types. Assert the same - assert (foi.getCategory() == Category.PRIMITIVE); - - // Get writable object - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - Object writableCol = poi.getPrimitiveWritableObject(fieldData); - - // NOTE: The default value for null fields in vectorization is -1 for int types - switch (poi.getPrimitiveCategory()) { - case SHORT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; - if (writableCol != null) { - lcv.vector[rowIndex] = ((ShortWritable) writableCol).get(); - lcv.isNull[rowIndex] = false; - } else { - lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); - } - } - break; - case INT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; - if (writableCol != null) { - lcv.vector[rowIndex] = ((IntWritable) writableCol).get(); - lcv.isNull[rowIndex] = false; - } else { - lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); - } - } - break; - case LONG: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; - if (writableCol != null) { - lcv.vector[rowIndex] = ((LongWritable) writableCol).get(); - lcv.isNull[rowIndex] = false; - } else { - lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); - } - } - break; - case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; - if (writableCol != null) { - dcv.vector[rowIndex] = ((FloatWritable) writableCol).get(); - dcv.isNull[rowIndex] = false; - } else { - dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); - } - } - break; - case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; - if (writableCol != null) { - dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get(); - dcv.isNull[rowIndex] = false; - } else { - dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); - } - } - break; - case STRING: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[i]; - if (writableCol != null) { - bcv.isNull[rowIndex] = false; - Text colText = (Text) writableCol; - bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength()); - } else { - SetNullColIsNullValue(bcv, rowIndex); - } - } - break; - default: - throw new HiveException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - } + VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch); } /** - * Iterates thru all the column vectors and sets noNull to - * specified value. + * Deserialized set of rows and populates the batch * - * @param valueToSet - * noNull value to set + * @param rowBlob + * to deserialize * @param batch - * Batch on which noNull is set - */ - public void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { - for (int i = 0; i < batch.numCols; i++) { - batch.cols[i].noNulls = true; - } - } - - /** - * Deserialized set of rows and populates the batch - * @param rowBlob to deserialize - * @param batch Vectorized row batch which contains deserialized data + * Vectorized row batch which contains deserialized data * @throws SerDeException */ - public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch) + public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, + VectorizedRowBatch batch) throws SerDeException { if (deserializer instanceof VectorizedSerde) { - batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob, - deserializer.getObjectInspector(), batch); + ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch); } else { throw new SerDeException( "Not able to deserialize row batch. Serde does not implement VectorizedSerde"); @@ -410,12 +309,4 @@ public void AddPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti } } } - - private void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { - cv.isNull[rowIndex] = true; - if (cv.noNulls) { - cv.noNulls = false; - } - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java index 78a9084..a5753a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java @@ -27,7 +27,6 @@ Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) throws SerDeException; - VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector, - VectorizedRowBatch reuseBatch) - throws SerDeException; + void deserializeVector(Object rowBlob, int rowsInBlob, VectorizedRowBatch reuseBatch) + throws SerDeException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java new file mode 100644 index 0000000..4bfeb20 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * CommonRCFileInputFormat. + * Wrapper class that calls the correct input format for RC file base on + * HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED parameter + */ +public class CommonRCFileInputFormat extends FileInputFormat + implements InputFormatChecker, VectorizedInputFormatInterface{ + + RCFileInputFormat rcif = + new RCFileInputFormat(); + VectorizedRCFileInputFormat vrcif = new VectorizedRCFileInputFormat(); + + private static class CommonOrcRecordReader + implements RecordReader { + + final RecordReader vrcrr; + final RecordReader rcrr; + + public CommonOrcRecordReader(RecordReader vrcrr, + RecordReader rcrr) { + this.vrcrr = vrcrr; + this.rcrr = rcrr; + } + + @Override + public void close() throws IOException { + if (vrcrr != null) { + vrcrr.close(); + } else { + rcrr.close(); + } + + } + + @Override + public Writable createKey() { + if (vrcrr != null) { + return vrcrr.createKey(); + } else { + return rcrr.createKey(); + } + } + + @Override + public Writable createValue() { + if (vrcrr != null) { + return vrcrr.createValue(); + } else { + return rcrr.createValue(); + } + } + + @Override + public long getPos() throws IOException { + if (vrcrr != null) { + return vrcrr.getPos(); + } else { + return rcrr.getPos(); + } + } + + @Override + public float getProgress() throws IOException { + if (vrcrr != null) { + return vrcrr.getProgress(); + } else { + return rcrr.getProgress(); + } + } + + @Override + public boolean next(Writable arg0, Writable arg1) throws IOException { + if (vrcrr != null) { + return vrcrr.next(NullWritable.get(), (VectorizedRowBatch) arg1); + } else { + LongWritable d = new LongWritable(); + return rcrr.next(d, (BytesRefArrayWritable) arg1); + } + } + + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) + throws IOException { + boolean vectorPath = + conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true); + if (vectorPath) { + return vrcif.validateInput(fs, conf, files); + } else { + return rcif.validateInput(fs, conf, files); + } + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf conf, + Reporter reporter) throws IOException { + boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true); + if (vectorPath) { + RecordReader vrcrr = vrcif.getRecordReader(split, conf, reporter); + return new CommonOrcRecordReader(vrcrr, null); + } else { + RecordReader rcrr = rcif.getRecordReader(split, conf, reporter); + return new CommonOrcRecordReader(null, rcrr); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java new file mode 100644 index 0000000..faad5f2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A MapReduce/Hive Vectorized input format for RC files. + */ +public class VectorizedRCFileInputFormat extends FileInputFormat + implements InputFormatChecker { + + public VectorizedRCFileInputFormat() { + setMinSplitSize(SequenceFile.SYNC_INTERVAL); + } + + @Override + @SuppressWarnings("unchecked") + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + reporter.setStatus(split.toString()); + + return new VectorizedRCFileRecordReader(job, (FileSplit) split); + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, + ArrayList files) throws IOException { + if (files.size() <= 0) { + return false; + } + for (int fileId = 0; fileId < files.size(); fileId++) { + RCFile.Reader reader = null; + try { + reader = new RCFile.Reader(fs, files.get(fileId) + .getPath(), conf); + reader.close(); + reader = null; + } catch (IOException e) { + return false; + } finally { + if (null != reader) { + reader.close(); + } + } + } + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java new file mode 100644 index 0000000..54c2438 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; +import org.apache.hadoop.hive.ql.io.RCFile.Reader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * RCFileRecordReader. + * + * @param + * @param + */ +public class VectorizedRCFileRecordReader implements RecordReader { + + private final Reader in; + private final long start; + private final long end; + private boolean more = true; + protected Configuration conf; + private final FileSplit split; + private final boolean useCache; + private VectorizedRowBatchCtx rbCtx; + private final LongWritable keyCache = new LongWritable(); + private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); + + private static RCFileSyncCache syncCache = new RCFileSyncCache(); + + private static final class RCFileSyncEntry { + long end; + long endSync; + } + + private static final class RCFileSyncCache { + + private final Map cache; + + public RCFileSyncCache() { + cache = Collections.synchronizedMap(new WeakHashMap()); + } + + public void put(FileSplit split, long endSync) { + Path path = split.getPath(); + long end = split.getStart() + split.getLength(); + String key = path.toString() + "+" + String.format("%d", end); + + RCFileSyncEntry entry = new RCFileSyncEntry(); + entry.end = end; + entry.endSync = endSync; + if (entry.endSync >= entry.end) { + cache.put(key, entry); + } + } + + public long get(FileSplit split) { + Path path = split.getPath(); + long start = split.getStart(); + String key = path.toString() + "+" + String.format("%d", start); + RCFileSyncEntry entry = cache.get(key); + if (entry != null) { + return entry.endSync; + } + return -1; + } + } + + public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) + throws IOException { + + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + this.in = new RCFile.Reader(fs, path, conf); + this.end = split.getStart() + split.getLength(); + this.conf = conf; + this.split = split; + + useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE); + + if (split.getStart() > in.getPosition()) { + long oldSync = useCache ? syncCache.get(split) : -1; + if (oldSync == -1) { + in.sync(split.getStart()); // sync to start + } else { + in.seek(oldSync); + } + } + + this.start = in.getPosition(); + + more = start < end; + try { + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.Init(conf, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Class getKeyClass() { + return LongWritable.class; + } + + public Class getValueClass() { + return BytesRefArrayWritable.class; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + VectorizedRowBatch result = null; + try { + result = rbCtx.CreateVectorizedRowBatch(); + // Since the record reader works only on one split and + // given a split the partition cannot change, we are setting the partition + // values only once during batch creation + rbCtx.AddPartitionColsToBatch(result); + } catch (HiveException e) { + new RuntimeException("Error creating a batch", e); + } + return result; + } + + public boolean nextBlock() throws IOException { + return in.nextBlock(); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + + // Reset column fields noNull values to true + VectorizedBatchUtil.SetNoNullFields(true, value); + int i = 0; + try { + for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + more = next(keyCache); + if (more) { + in.getCurrentRow(colsCache); + // Currently RCFile reader does not support reading vectorized + // data. Populating the batch by adding one row at a time. + rbCtx.AddRowToBatch(i, (Writable) colsCache, value); + } else { + break; + } + } + } catch (Exception e) { + new RuntimeException("Error while getting next row", e); + } + value.size = i; + return more; + } + + protected boolean next(LongWritable key) throws IOException { + if (!more) { + return false; + } + + more = in.next(key); + + long lastSeenSyncPos = in.lastSeenSyncPos(); + + if (lastSeenSyncPos >= end) { + if (useCache) { + syncCache.put(split, lastSeenSyncPos); + } + more = false; + return more; + } + return more; + } + + /** + * Return the progress within the input split. + * + * @return 0.0 to 1.0 of the input byte range + */ + public float getProgress() throws IOException { + if (end == start) { + return 0.0f; + } else { + return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start)); + } + } + + public long getPos() throws IOException { + return in.getPosition(); + } + + public KeyBuffer getKeyBuffer() { + return in.getCurrentKeyBufferObj(); + } + + protected void seek(long pos) throws IOException { + in.seek(pos); + } + + public void sync(long pos) throws IOException { + in.sync(pos); + } + + public void resetBuffer() { + in.resetBuffer(); + } + + public long getStart() { + return start; + } + + public void close() throws IOException { + in.close(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java index 1306710..ac2b6df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java @@ -78,7 +78,7 @@ public void initialize(Configuration conf, Properties table) { // Parse the configuration parameters ArrayList columnNames = new ArrayList(); if (columnNameProperty != null && columnNameProperty.length() > 0) { - for(String name: columnNameProperty.split(",")) { + for (String name : columnNameProperty.split(",")) { columnNames.add(name); } } @@ -95,7 +95,7 @@ public void initialize(Configuration conf, Properties table) { } ArrayList fieldTypes = - TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); StructTypeInfo rootType = new StructTypeInfo(); rootType.setAllStructFieldNames(columnNames); rootType.setAllStructFieldTypeInfos(fieldTypes); @@ -127,6 +127,7 @@ public ObjectInspector getObjectInspector() throws SerDeException { /** * Always returns null, since serialized size doesn't make sense in the * context of ORC files. + * * @return null */ @Override @@ -144,8 +145,8 @@ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspe } @Override - public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector, - VectorizedRowBatch reuseBatch) throws SerDeException { - return ((VectorizedRowBatch) rowBlob); + public void deserializeVector(Object rowBlob, int rowsInBatch, VectorizedRowBatch reuseBatch) + throws SerDeException { + // nothing to do here } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index bfcc765..2c20987 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -80,7 +80,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } reader.nextBatch(value); try { - rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value); + rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value); } catch (SerDeException e) { new RuntimeException(e); } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java index 1bc6c74..90d3134 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java @@ -46,7 +46,9 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.DefaultCodec; import org.junit.Before; import org.junit.Test; @@ -147,7 +149,7 @@ private void WriteRCFile(FileSystem fs, Path file, Configuration conf) bytes.set(4, cu); cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0, - ("NULL").getBytes("UTF-8").length); + ("Test string").getBytes("UTF-8").length); bytes.set(5, cu); } writer.append(bytes); @@ -169,7 +171,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I // Create the context VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null); VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch(); - ctx.SetNoNullFields(true, batch); + VectorizedBatchUtil.SetNoNullFields(true, batch); // Iterate thru the rows and populate the batch LongWritable rowID = new LongWritable(); @@ -272,9 +274,19 @@ void ValidateRowBatch(VectorizedRowBatch batch) throws IOException, SerDeExcepti @Test public void TestCtx() throws Exception { + InitSerde(); WriteRCFile(this.fs, this.testFilePath, this.conf); VectorizedRowBatch batch = GetRowBatch(); ValidateRowBatch(batch); + + // Test VectorizedColumnarSerDe + VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe(); + vcs.initialize(this.conf, tbl); + Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe + .getObjectInspector()); + BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[])((ObjectWritable)w).get(); + vcs.deserializeVector(refArray, 10, batch); + ValidateRowBatch(batch); } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java index 11f5f07..d7de834 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java @@ -26,11 +26,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -72,13 +70,14 @@ public String toString() { public ColumnarSerDe() throws SerDeException { } - SerDeParameters serdeParams = null; + protected SerDeParameters serdeParams = null; /** * Initialize the SerDe given the parameters. * * @see SerDe#initialize(Configuration, Properties) */ + @Override public void initialize(Configuration job, Properties tbl) throws SerDeException { serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName()); @@ -114,6 +113,7 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException * @return The serialized Writable object * @see SerDe#serialize(Object, ObjectInspector) */ + @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { if (objInspector.getCategory() != Category.STRUCT) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java index 27ed4ef..6958284 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java @@ -27,18 +27,18 @@ import java.util.Arrays; import java.util.Properties; -import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; @@ -141,7 +141,7 @@ public static String convertToString(byte[] bytes, int start, int length) { * if escaped, whether a specific character needs escaping. This * array should have size of 128. */ - private static void writeEscaped(OutputStream out, byte[] bytes, int start, + public static void writeEscaped(OutputStream out, byte[] bytes, int start, int len, boolean escaped, byte escapeChar, boolean[] needsEscape) throws IOException { if (escaped) {