diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 80bf671..5f22e9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -18,14 +18,22 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; +import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyLong; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -44,7 +52,7 @@ * @param cv * @param rowIndex */ - public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { + public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) { cv.isNull[rowIndex] = true; if (cv.noNulls) { cv.noNulls = false; @@ -60,7 +68,7 @@ public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { * @param batch * Batch on which noNull is set */ - public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { + public static void setNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { for (int i = 0; i < batch.numCols; i++) { batch.cols[i].noNulls = true; } @@ -74,7 +82,7 @@ public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) * @param batch Vectorized batch to which the row is added at rowIndex * @throws HiveException */ - public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex, + public static void addRowToBatch(Object row, StructObjectInspector oi, int rowIndex, VectorizedRowBatch batch) throws HiveException { List fieldRefs = oi.getAllStructFieldRefs(); // Iterate thru the cols and load the batch @@ -99,7 +107,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -110,7 +118,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -121,7 +129,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -132,7 +140,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -143,7 +151,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -154,7 +162,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn dcv.isNull[rowIndex] = false; } else { dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); + setNullColIsNullValue(dcv, rowIndex); } } break; @@ -165,7 +173,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn dcv.isNull[rowIndex] = false; } else { dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); + setNullColIsNullValue(dcv, rowIndex); } } break; @@ -177,7 +185,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -188,7 +196,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn Text colText = (Text) writableCol; bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength()); } else { - SetNullColIsNullValue(bcv, rowIndex); + setNullColIsNullValue(bcv, rowIndex); } } break; @@ -199,4 +207,92 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn } } + /** + * Serializes a vector field(column data) into byte stream. + * @param fieldOI Field object inspector + * @param currentColVector ColumnVector that contains the field to serialize + * @param rowIndex index of row in currentColVector that contains the field data + * @param out byte steam to which the field is serialized + * @param serdeParams Serde params used to serialize the field + * @throws IOException + * @throws SerDeException + */ + public static void serializeVectorField(ObjectInspector fieldOI, ColumnVector currentColVector, int rowIndex, + ByteStream.Output out, SerDeParameters serdeParams) throws IOException, SerDeException { + + switch (fieldOI.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldOI; + if (!currentColVector.noNulls + && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) { + // The column is null hence write null value + out.write(serdeParams.getNullSequence().getBytes(), 0, serdeParams + .getNullSequence().getLength()); + } else { + // If here then the vector value is not null. + if (currentColVector.isRepeating) { + // If the vector has repeating values then set rowindex to zero + rowIndex = 0; + } + + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: { + LongColumnVector lcv = (LongColumnVector) currentColVector; + // In vectorization true is stored as 1 and false as 0 + boolean b = lcv.vector[rowIndex] == 1 ? true : false; + if (b) { + out.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); + } else { + out.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); + } + } + break; + case BYTE: + case SHORT: + case INT: + case LONG: + LongColumnVector lcv = (LongColumnVector) currentColVector; + LazyLong.writeUTF8(out, lcv.vector[rowIndex]); + break; + case FLOAT: + case DOUBLE: + DoubleColumnVector dcv = (DoubleColumnVector) currentColVector; + ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex])); + out.write(b.array(), 0, b.limit()); + break; + case STRING: + BytesColumnVector bcv = (BytesColumnVector) currentColVector; + LazyUtils.writeEscaped(out, bcv.vector[rowIndex], + bcv.start[rowIndex], + bcv.length[rowIndex], + serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams + .getNeedsEscape()); + break; + case TIMESTAMP: + LongColumnVector tcv = (LongColumnVector) currentColVector; + long timeInNanoSec = tcv.vector[rowIndex]; + Timestamp t = new Timestamp(0); + TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t); + TimestampWritable tw = new TimestampWritable(); + tw.set(t); + LazyTimestamp.writeUTF8(out, tw); + break; + default: + throw new UnsupportedOperationException( + "Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + } + break; + } + case LIST: + case MAP: + case STRUCT: + case UNION: + throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:" + + fieldOI.getCategory()); + default: + throw new SerDeException("Unknown ObjectInspector category!"); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java index 69553d9..aa4e0f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.nio.ByteBuffer; -import java.sql.Timestamp; import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -29,17 +27,11 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.lazy.LazyLong; -import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** @@ -108,88 +100,11 @@ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspe for (int p = 0; p < batch.projectionSize; p++) { int k = batch.projectedColumns[p]; ObjectInspector foi = fields.get(k).getFieldObjectInspector(); - ColumnVector currentColVector = batch.cols[k]; - - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - if (!currentColVector.noNulls - && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) { - // The column is null hence write null value - serializeVectorStream.write(new byte[0], 0, 0); - } else { - // If here then the vector value is not null. - if (currentColVector.isRepeating) { - // If the vector has repeating values then set rowindex to zero - rowIndex = 0; - } - - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - // In vectorization true is stored as 1 and false as 0 - boolean b = lcv.vector[rowIndex] == 1 ? true : false; - if (b) { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } else { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } - } - break; - case BYTE: - case SHORT: - case INT: - case LONG: - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]); - break; - case FLOAT: - case DOUBLE: - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k]; - ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex])); - serializeVectorStream.write(b.array(), 0, b.limit()); - break; - case STRING: - BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; - LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex], - bcv.start[rowIndex], - bcv.length[rowIndex], - serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams - .getNeedsEscape()); - break; - case TIMESTAMP: - LongColumnVector tcv = (LongColumnVector) batch.cols[k]; - long timeInNanoSec = tcv.vector[rowIndex]; - Timestamp t = new Timestamp(0); - TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t); - TimestampWritable tw = new TimestampWritable(); - tw.set(t); - LazyTimestamp.writeUTF8(serializeVectorStream, tw); - break; - default: - throw new UnsupportedOperationException( - "Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new SerDeException("Unknown ObjectInspector category!"); - - } - + VectorizedBatchUtil.serializeVectorField(foi, batch.cols[k], rowIndex, serializeVectorStream, serdeParams); byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream .getCount() - count); count = serializeVectorStream.getCount(); } - } ow.set(byteRefArray); } catch (Exception e) { @@ -247,7 +162,7 @@ public void deserializeVector(Object rowBlob, int rowsInBlob, for (int i = 0; i < rowsInBlob; i++) { Object row = deserialize(refArray[i]); try { - VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch); + VectorizedBatchUtil.addRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch); } catch (HiveException e) { throw new SerDeException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedLazySimpleSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedLazySimpleSerDe.java new file mode 100644 index 0000000..73f16a0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedLazySimpleSerDe.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * + * VectorizedLazySimpleSerDe is used by vectorized query engine when processing a + * text file and used to serialize and deserialize VectorizedRowBatch. + * + */ + +public class VectorizedLazySimpleSerDe extends LazySimpleSerDe implements VectorizedSerde { + + private final Text[] textArray = new Text[VectorizedRowBatch.DEFAULT_SIZE]; + private final ObjectWritable ow = new ObjectWritable(); + private final ByteStream.Output serializeVectorStream = new ByteStream.Output(); + + public VectorizedLazySimpleSerDe() throws SerDeException { + } + + /** + * Returns the statistics after (de)serialization) + */ + @Override + public SerDeStats getSerDeStats() { + return null; + } + + /** + * Serialize a vectorized row batch + * + * @param obj + * Vectorized row batch to serialize + * @param objInspector + * The ObjectInspector for the row object + * @return The serialized Writable object that contains the array of + * serialized rows as Text Writable. + * @throws SerDeException + * @see SerDe#serialize(Object, ObjectInspector) + */ + @Override + public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) + throws SerDeException { + try { + // Validate that the OI is of struct type + if (objInspector.getCategory() != Category.STRUCT) { + throw new UnsupportedOperationException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + VectorizedRowBatch batch = (VectorizedRowBatch) vrg; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + + // Reset the byte buffer + serializeVectorStream.reset(); + int count = 0; + int rowIndex = 0; + for (int i = 0; i < batch.size; i++) { + + // If selectedInUse is true then we need to serialize only + // the selected indexes + if (batch.selectedInUse) { + rowIndex = batch.selected[i]; + } else { + rowIndex = i; + } + + Text row = textArray[i]; + + if (row == null) { + row = new Text(); + textArray[i] = row; + } + + row.clear(); + + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; + ObjectInspector foi = fields.get(k).getFieldObjectInspector(); + VectorizedBatchUtil.serializeVectorField(foi, batch.cols[k], rowIndex, + this.serializeVectorStream, serdeParams); + } + + row.set(serializeVectorStream.getData(), count, serializeVectorStream + .getCount() - count); + count = serializeVectorStream.getCount(); + } + ow.set(textArray); + } catch (Exception e) { + throw new SerDeException(e); + } + return ow; + } + + /** + * Deserializes the rowBlob into Vectorized row batch + * @param rowBlob + * rowBlob row batch to deserialize + * @param rowsInBlob + * Total number of rows in rowBlob to deserialize + * @param reuseBatch + * VectorizedRowBatch to which the rows should be serialized * + * @throws SerDeException + */ + @Override + public void deserializeVector(Object rowBlob, int rowsInBlob, + VectorizedRowBatch reuseBatch) throws SerDeException { + + Text[] refArray = (Text[]) rowBlob; + for (int i = 0; i < rowsInBlob; i++) { + Object row = deserialize(refArray[i]); + try { + VectorizedBatchUtil.addRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch); + } catch (HiveException e) { + throw new SerDeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 5018ea1..e470fd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -106,7 +106,7 @@ public VectorizedRowBatchCtx() { * @throws IllegalAccessException * @throws HiveException */ - public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, + public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, IOException, SerDeException, InstantiationException, @@ -187,7 +187,7 @@ public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx * @return VectorizedRowBatch * @throws HiveException */ - public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException + public VectorizedRowBatch createVectorizedRowBatch() throws HiveException { List fieldRefs = rowOI.getAllStructFieldRefs(); VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size()); @@ -247,11 +247,11 @@ public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException * @throws HiveException * @throws SerDeException */ - public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) + public void addRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) throws HiveException, SerDeException { Object row = this.deserializer.deserialize(rowBlob); - VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch); + VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch); } /** @@ -263,7 +263,7 @@ public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch bat * Vectorized row batch which contains deserialized data * @throws SerDeException */ - public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, + public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, VectorizedRowBatch batch) throws SerDeException { @@ -275,7 +275,7 @@ public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, } } - private int GetColIndexBasedOnColName(String colName) throws HiveException + private int getColIndexBasedOnColName(String colName) throws HiveException { List fieldRefs = rowOI.getAllStructFieldRefs(); for (int i = 0; i < fieldRefs.size(); i++) { @@ -292,14 +292,14 @@ private int GetColIndexBasedOnColName(String colName) throws HiveException * @param batch * @throws HiveException */ - public void AddPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException + public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException { int colIndex; String value; BytesColumnVector bcv; if (partitionValues != null) { for (String key : partitionValues.keySet()) { - colIndex = GetColIndexBasedOnColName(key); + colIndex = getColIndexBasedOnColName(key); value = partitionValues.get(key); bcv = (BytesColumnVector) batch.cols[colIndex]; bcv.setRef(0, value.getBytes(), 0, value.length()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java index 4bfeb20..3e21adc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java @@ -48,13 +48,13 @@ new RCFileInputFormat(); VectorizedRCFileInputFormat vrcif = new VectorizedRCFileInputFormat(); - private static class CommonOrcRecordReader + private static class CommonRCRecordReader implements RecordReader { final RecordReader vrcrr; final RecordReader rcrr; - public CommonOrcRecordReader(RecordReader vrcrr, + public CommonRCRecordReader(RecordReader vrcrr, RecordReader rcrr) { this.vrcrr = vrcrr; this.rcrr = rcrr; @@ -136,10 +136,10 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true); if (vectorPath) { RecordReader vrcrr = vrcif.getRecordReader(split, conf, reporter); - return new CommonOrcRecordReader(vrcrr, null); + return new CommonRCRecordReader(vrcrr, null); } else { RecordReader rcrr = rcif.getRecordReader(split, conf, reporter); - return new CommonOrcRecordReader(null, rcrr); + return new CommonRCRecordReader(null, rcrr); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CommonTextFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CommonTextFileInputFormat.java new file mode 100644 index 0000000..61806c6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/CommonTextFileInputFormat.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; + +/** + * CommonTextFileInputFormat. + * Wrapper class that calls the correct input format for Text file based on + * HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED parameter + */ +public class CommonTextFileInputFormat extends FileInputFormat implements + VectorizedInputFormatInterface{ + + TextInputFormat rcif = new TextInputFormat(); + VectorizedTextInputFormat vrcif = new VectorizedTextInputFormat(); + + private static class CommonTextRecordReader + implements RecordReader { + + final RecordReader vrcrr; + final RecordReader rcrr; + + public CommonTextRecordReader(RecordReader vrcrr, + RecordReader rcrr) { + this.vrcrr = vrcrr; + this.rcrr = rcrr; + } + + @Override + public void close() throws IOException { + if (vrcrr != null) { + vrcrr.close(); + } else { + rcrr.close(); + } + + } + + @Override + public Writable createKey() { + if (vrcrr != null) { + return vrcrr.createKey(); + } else { + return rcrr.createKey(); + } + } + + @Override + public Writable createValue() { + if (vrcrr != null) { + return vrcrr.createValue(); + } else { + return rcrr.createValue(); + } + } + + @Override + public long getPos() throws IOException { + if (vrcrr != null) { + return vrcrr.getPos(); + } else { + return rcrr.getPos(); + } + } + + @Override + public float getProgress() throws IOException { + if (vrcrr != null) { + return vrcrr.getProgress(); + } else { + return rcrr.getProgress(); + } + } + + @Override + public boolean next(Writable arg0, Writable arg1) throws IOException { + if (vrcrr != null) { + return vrcrr.next(NullWritable.get(), (VectorizedRowBatch) arg1); + } else { + LongWritable d = new LongWritable(); + return rcrr.next(d, (Text) arg1); + } + } + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf conf, + Reporter reporter) throws IOException { + boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true); + if (vectorPath) { + RecordReader vrcrr = vrcif.getRecordReader(split, conf, reporter); + return new CommonTextRecordReader(vrcrr, null); + } else { + RecordReader rcrr = rcif.getRecordReader(split, conf, reporter); + return new CommonTextRecordReader(null, rcrr); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedLineRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedLineRecordReader.java new file mode 100644 index 0000000..393007f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedLineRecordReader.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.LineReader; + +/** + * Treats keys as offset in file and value as line. + */ +public class VectorizedLineRecordReader implements RecordReader { + private static final Log LOG + = LogFactory.getLog(VectorizedLineRecordReader.class.getName()); + + private CompressionCodecFactory compressionCodecs = null; + private long start; + private long pos; + private long end; + private LineReader in; + private VectorizedRowBatchCtx rbCtx; + int maxLineLength; + + public VectorizedLineRecordReader(Configuration job, + FileSplit split, byte[] recordDelimiter) throws IOException { + this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", + Integer.MAX_VALUE); + + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + boolean skipFirstLine = false; + if (codec != null) { + in = new LineReader(codec.createInputStream(fileIn), job, recordDelimiter); + end = Long.MAX_VALUE; + } else { + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, job, recordDelimiter); + } + if (skipFirstLine) { // skip first line and re-establish "start". + start += in.readLine(new Text(), 0, + (int) Math.min((long) Integer.MAX_VALUE, end - start)); + } + this.pos = start; + } + + public VectorizedLineRecordReader(Configuration job, + FileSplit split) throws IOException { + this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", + Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + boolean skipFirstLine = false; + if (codec != null) { + in = new LineReader(codec.createInputStream(fileIn), job); + end = Long.MAX_VALUE; + } else { + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, job); + } + if (skipFirstLine) { // skip first line and re-establish "start". + start += in.readLine(new Text(), 0, + (int)Math.min((long)Integer.MAX_VALUE, end - start)); + } + this.pos = start; + + try { + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(job, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public VectorizedLineRecordReader(InputStream in, long offset, long endOffset, + int maxLineLength) { + this.maxLineLength = maxLineLength; + this.in = new LineReader(in); + this.start = offset; + this.pos = offset; + this.end = endOffset; + } + + public VectorizedLineRecordReader(InputStream in, long offset, long endOffset, + Configuration job) + throws IOException{ + this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", + Integer.MAX_VALUE); + this.in = new LineReader(in, job); + this.start = offset; + this.pos = offset; + this.end = endOffset; + } + + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + VectorizedRowBatch result = null; + try { + result = rbCtx.createVectorizedRowBatch(); + // Since the record reader works only on one split and + // given a split the partition cannot change, we are setting the partition + // values only once during batch creation + rbCtx.addPartitionColsToBatch(result); + } catch (HiveException e) { + new RuntimeException("Error creating a batch", e); + } + return result; + } + + public synchronized boolean next(NullWritable key, VectorizedRowBatch value) + throws IOException { + + Text line = new Text(); + int i = 0; + for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + if (readLine(line)) { + try { + rbCtx.addRowToBatch(i, line, value); + } catch (Exception e) { + new RuntimeException("Error while getting next row", e); + } + } else { + break; + } + } + value.size = i; + + // Return false is the batch contains zero records + return i != 0; + } + + private boolean readLine(Text lineBuffer) throws IOException { + + lineBuffer.clear(); + while (pos < end) { + + int newSize = in.readLine(lineBuffer, maxLineLength, + Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), + maxLineLength)); + if (newSize == 0) { + return false; + } + pos += newSize; + if (newSize < maxLineLength) { + return true; + } + + // line too long. try again + LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); + } + + return false; + } + + /** + * Get the progress within the split + */ + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float)(end - start)); + } + } + + public synchronized long getPos() throws IOException { + return pos; + } + + public synchronized void close() throws IOException { + if (in != null) { + in.close(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java index 25b3aed..e3c47b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java @@ -125,18 +125,18 @@ public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) more = start < end; try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.Init(conf, split); + rbCtx.init(conf, split); } catch (Exception e) { throw new RuntimeException(e); } } public Class getKeyClass() { - return LongWritable.class; + return NullWritable.class; } public Class getValueClass() { - return BytesRefArrayWritable.class; + return VectorizedRowBatch.class; } @Override @@ -148,11 +148,11 @@ public NullWritable createKey() { public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { - result = rbCtx.CreateVectorizedRowBatch(); + result = rbCtx.createVectorizedRowBatch(); // Since the record reader works only on one split and // given a split the partition cannot change, we are setting the partition // values only once during batch creation - rbCtx.AddPartitionColsToBatch(result); + rbCtx.addPartitionColsToBatch(result); } catch (HiveException e) { new RuntimeException("Error creating a batch", e); } @@ -167,7 +167,7 @@ public boolean nextBlock() throws IOException { public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { // Reset column fields noNull values to true - VectorizedBatchUtil.SetNoNullFields(true, value); + VectorizedBatchUtil.setNoNullFields(true, value); value.selectedInUse = false; for (int i = 0; i < value.numCols; i++) { value.cols[i].isRepeating = false; @@ -181,7 +181,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti in.getCurrentRow(colsCache); // Currently RCFile reader does not support reading vectorized // data. Populating the batch by adding one row at a time. - rbCtx.AddRowToBatch(i, (Writable) colsCache, value); + rbCtx.addRowToBatch(i, (Writable) colsCache, value); } else { break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java new file mode 100644 index 0000000..ad2ce66 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; + + +/** An {@link InputFormat} for plain text files. Files are broken into lines. + * Either linefeed or carriage-return are used to signal end of line. Keys are + * the position in the file, and values are the line of text.. + */ +public class VectorizedTextInputFormat extends FileInputFormat + implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + return compressionCodecs.getCodec(file) == null; + } + + @Override + public VectorizedLineRecordReader getRecordReader( + InputSplit genericSplit, JobConf job, + Reporter reporter) + throws IOException { + String delimiter = job.get("textinputformat.record.delimiter", null); + String decodeDelimiter = delimiter; + if(null != delimiter){ + if (org.apache.commons.codec.binary.Base64.isArrayByteBase64(delimiter.getBytes())) { + decodeDelimiter = new String(org.apache.commons.codec.binary.Base64.decodeBase64(delimiter.getBytes())); + job.set("textinputformat.record.delimiter", decodeDelimiter); + } + return new VectorizedLineRecordReader(job, (FileSplit) genericSplit, decodeDelimiter.getBytes()); + } + reporter.setStatus(genericSplit.toString()); + return new VectorizedLineRecordReader(job, (FileSplit) genericSplit); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 2c20987..11a5b76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -66,7 +66,7 @@ try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.Init(conf, fileSplit); + rbCtx.init(conf, fileSplit); } catch (Exception e) { throw new RuntimeException(e); } @@ -80,7 +80,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } reader.nextBatch(value); try { - rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value); + rbCtx.convertRowBatchBlobToVectorizedBatch((Object)value, value.size, value); } catch (SerDeException e) { new RuntimeException(e); } @@ -97,11 +97,11 @@ public NullWritable createKey() { public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { - result = rbCtx.CreateVectorizedRowBatch(); + result = rbCtx.createVectorizedRowBatch(); // Since the record reader works only on one split and // given a split the partition cannot change, we are setting the partition // values only once during batch creation - rbCtx.AddPartitionColsToBatch(result); + rbCtx.addPartitionColsToBatch(result); } catch (HiveException e) { new RuntimeException("Error creating a batch", e); } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java index 78ebb17..12ea004 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java @@ -203,8 +203,8 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I // Create the context VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null); - VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch(); - VectorizedBatchUtil.SetNoNullFields(true, batch); + VectorizedRowBatch batch = ctx.createVectorizedRowBatch(); + VectorizedBatchUtil.setNoNullFields(true, batch); // Iterate thru the rows and populate the batch LongWritable rowID = new LongWritable(); @@ -213,7 +213,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I BytesRefArrayWritable cols = new BytesRefArrayWritable(); reader.getCurrentRow(cols); cols.resetValid(colCount); - ctx.AddRowToBatch(i, cols, batch); + ctx.addRowToBatch(i, cols, batch); } reader.close(); batch.size = 10; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index d6b31a6..29f7b53 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -67,7 +67,7 @@ public static final byte[] DefaultSeparators = {(byte) 1, (byte) 2, (byte) 3}; - private ObjectInspector cachedObjectInspector; + protected ObjectInspector cachedObjectInspector; private long serializedSize; private SerDeStats stats; @@ -167,7 +167,7 @@ public byte getEscapeChar() { } } - SerDeParameters serdeParams = null; + protected SerDeParameters serdeParams = null; /** * Initialize the SerDe given the parameters. serialization.format: separator @@ -177,6 +177,7 @@ public byte getEscapeChar() { * * @see SerDe#initialize(Configuration, Properties) */ + @Override public void initialize(Configuration job, Properties tbl) throws SerDeException { @@ -272,6 +273,7 @@ public static SerDeParameters initSerdeParams(Configuration job, * @return The deserialized row Object. * @see SerDe#deserialize(Writable) */ + @Override public Object deserialize(Writable field) throws SerDeException { if (byteArrayRef == null) { byteArrayRef = new ByteArrayRef(); @@ -297,6 +299,7 @@ public Object deserialize(Writable field) throws SerDeException { /** * Returns the ObjectInspector for the row. */ + @Override public ObjectInspector getObjectInspector() throws SerDeException { return cachedObjectInspector; } @@ -306,6 +309,7 @@ public ObjectInspector getObjectInspector() throws SerDeException { * * @see SerDe#getSerializedClass() */ + @Override public Class getSerializedClass() { return Text.class; } @@ -324,6 +328,7 @@ public ObjectInspector getObjectInspector() throws SerDeException { * @throws IOException * @see SerDe#serialize(Object, ObjectInspector) */ + @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { @@ -517,6 +522,7 @@ public static void serialize(ByteStream.Output out, Object obj, * Returns the statistics after (de)serialization) */ + @Override public SerDeStats getSerDeStats() { // must be different assert (lastOperationSerialize != lastOperationDeserialize);