diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 031e178..46256c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -50,8 +50,10 @@ import org.apache.hadoop.mapred.FileSplit; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde in the RecordReader layer. - * It has supports partitions in this layer so that the vectorized batch is populated correctly with the partition column. + * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * in the RecordReader layer. + * It has supports partitions in this layer so that the vectorized batch is populated correctly with + * the partition column. * VectorizedRowBatchCtx. * */ @@ -356,10 +358,22 @@ public void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { } } - public void ConvertRowBatchBlobToVectorizedBatch(Writable[] rowBlobs, VectorizedRowBatch batch) { - // No reader supports this operation. If a reader returns a set of rows then - // this function can be used to converts that row blob batch into vectorized batch. - throw new UnsupportedOperationException(); + /** + * Deserialized set of rows and populates the batch + * @param rowBlob to deserialize + * @param batch Vectorized row batch which contains deserialized data + * @throws SerDeException + */ + public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch) + throws SerDeException { + + if (deserializer instanceof VectorizedSerde) { + batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob, + deserializer.getObjectInspector(), batch); + } else { + throw new SerDeException( + "Not able to deserialize row batch. Serde does not implement VectorizedSerde"); + } } private int GetColIndexBasedOnColName(String colName) throws HiveException @@ -384,14 +398,16 @@ public void AddPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti int colIndex; String value; BytesColumnVector bcv; - for (String key : partitionValues.keySet()) { - colIndex = GetColIndexBasedOnColName(key); - value = partitionValues.get(key); - bcv = (BytesColumnVector) batch.cols[colIndex]; - bcv.setRef(0, value.getBytes(), 0, value.length()); - bcv.isRepeating = true; - bcv.isNull[0] = false; - bcv.noNulls = true; + if (partitionValues != null) { + for (String key : partitionValues.keySet()) { + colIndex = GetColIndexBasedOnColName(key); + value = partitionValues.get(key); + bcv = (BytesColumnVector) batch.cols[colIndex]; + bcv.setRef(0, value.getBytes(), 0, value.length()); + bcv.isRepeating = true; + bcv.isNull[0] = false; + bcv.noNulls = true; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java index 0f3e636..78a9084 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java @@ -27,4 +27,7 @@ Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) throws SerDeException; + VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector, + VectorizedRowBatch reuseBatch) + throws SerDeException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java index 579d8a7..1306710 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java @@ -142,4 +142,10 @@ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspe } return vos.serialize(vrg, objInspector); } + + @Override + public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector, + VectorizedRowBatch reuseBatch) throws SerDeException { + return ((VectorizedRowBatch) rowBlob); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index c8fd293..3559a28 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -28,8 +28,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -49,24 +52,23 @@ private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; private final long offset; private final long length; - private final int numColumns; private float progress = 0.0f; - private final OrcStruct rowObj; - private final List types; + private VectorizedRowBatchCtx rbCtx; VectorizedOrcRecordReader(Reader file, Configuration conf, - long offset, long length) throws IOException { + FileSplit fileSplit) throws IOException { + + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); this.reader = file.rows(offset, length, findIncludedColumns(file.getTypes(), conf)); - types = file.getTypes(); - if (types.size() == 0) { - numColumns = 0; - } else { - numColumns = types.get(0).getSubtypesCount(); + + try { + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.Init(conf, fileSplit); + } catch (Exception e) { + throw new RuntimeException(e); } - this.offset = offset; - this.length = length; - rowObj = new OrcStruct(numColumns); } @Override @@ -76,6 +78,11 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return false; } reader.nextBatch(value); + try { + rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value); + } catch (SerDeException e) { + new RuntimeException(e); + } progress = reader.getProgress(); return true; } @@ -87,8 +94,17 @@ public NullWritable createKey() { @Override public VectorizedRowBatch createValue() { - return new VectorizedRowBatch(numColumns, - VectorizedRowBatch.DEFAULT_SIZE); + VectorizedRowBatch result = null; + try { + result = rbCtx.CreateVectorizedRowBatch(); + // Since the record reader works only on one split and + // given a split the partition cannot change, we are setting the partition + // values only once during batch creation + rbCtx.AddPartitionColsToBatch(result); + } catch (HiveException e) { + new RuntimeException("Error creating a batch", e); + } + return result; } @Override @@ -170,8 +186,7 @@ private static void includeColumnRecursive(List types, Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); reporter.setStatus(fileSplit.toString()); - return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, - fileSplit.getStart(), fileSplit.getLength()); + return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, fileSplit); } @Override