diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 246170d..2a45057 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -18,16 +18,17 @@ package org.apache.hadoop.hive.ql.exec.vector; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** - * This class supports string and binary data by value reference -- i.e. each field is + * This class supports string and binary data by value reference -- i.e. each field is * explicitly present, as opposed to provided by a dictionary reference. * In some cases, all the values will be in the same byte array to begin with, - * but this need not be the case. If each value is in a separate byte + * but this need not be the case. If each value is in a separate byte * array to start with, or not all of the values are in the same original * byte array, you can still assign data by reference into this column vector. - * This gives flexibility to use this in multiple situations. + * This gives flexibility to use this in multiple situations. *
* When setting data by reference, the caller
* is responsible for allocating the byte arrays used to hold the data.
@@ -36,23 +37,23 @@
* though that use is probably not typical.
*/
public class BytesColumnVector extends ColumnVector {
- public byte[][] vector;
+ public byte[][] vector;
public int[] start; // start offset of each field
-
+
/*
- * The length of each field. If the value repeats for every entry, then it is stored
+ * The length of each field. If the value repeats for every entry, then it is stored
* in vector[0] and isRepeating from the superclass is set to true.
*/
- public int[] length;
+ public int[] length;
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
-
+
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
-
- // Proportion of extra space to provide when allocating more buffer space.
+
+ // Proportion of extra space to provide when allocating more buffer space.
static final float EXTRA_SPACE_FACTOR = (float) 1.2;
-
+
/**
* Use this constructor for normal operation.
* All column vectors should be the default size normally.
@@ -60,21 +61,21 @@
public BytesColumnVector() {
this(VectorizedRowBatch.DEFAULT_SIZE);
}
-
+
/**
* Don't call this constructor except for testing purposes.
- *
+ *
* @param size number of elements in the column vector
*/
public BytesColumnVector(int size) {
super(size);
vector = new byte[size][];
start = new int[size];
- length = new int[size];
+ length = new int[size];
}
-
+
/** Set a field by reference.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -85,37 +86,37 @@ public void setRef(int elementNum, byte[] sourceBuf, int start, int length) {
this.start[elementNum] = start;
this.length[elementNum] = length;
}
-
- /**
+
+ /**
* You must call initBuffer first before using setVal().
* Provide the estimated number of bytes needed to hold
* a full column vector worth of byte string data.
- *
+ *
* @param estimatedValueSize Estimated size of buffer space needed
*/
public void initBuffer(int estimatedValueSize) {
nextFree = 0;
-
+
// if buffer is already allocated, keep using it, don't re-allocate
if (buffer != null) {
return;
}
-
+
// allocate a little extra space to limit need to re-allocate
int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
if (bufferSize < DEFAULT_BUFFER_SIZE) {
bufferSize = DEFAULT_BUFFER_SIZE;
}
- buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
}
-
+
/**
* Initialize buffer to default size.
*/
public void initBuffer() {
initBuffer(0);
}
-
+
/**
* @return amount of buffer space currently allocated
*/
@@ -125,13 +126,13 @@ public int bufferSize() {
}
return buffer.length;
}
-
+
/**
* Set a field by actually copying in to a local buffer.
* If you must actually copy data in to the array, use this method.
* DO NOT USE this method unless it's not practical to set data by reference with setRef().
* Setting data by reference tends to run a lot faster than copying data in.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -147,24 +148,24 @@ public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
this.length[elementNum] = length;
nextFree += length;
}
-
+
/**
* Increase buffer space enough to accommodate next element.
- * This uses an exponential increase mechanism to rapidly
+ * This uses an exponential increase mechanism to rapidly
* increase buffer size to enough to hold all data.
* As batches get re-loaded, buffer space allocated will quickly
* stabilize.
- *
+ *
* @param nextElemLength size of next element to be added
*/
public void increaseBufferSpace(int nextElemLength) {
-
+
// Keep doubling buffer size until there will be enough space for next element.
- int newLength = 2 * buffer.length;
+ int newLength = 2 * buffer.length;
while((nextFree + nextElemLength) > newLength) {
newLength *= 2;
}
-
+
// Allocate new buffer, copy data to it, and set buffer to new buffer.
byte[] newBuffer = new byte[newLength];
System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
@@ -173,9 +174,11 @@ public void increaseBufferSpace(int nextElemLength) {
@Override
public Writable getWritableObject(int index) {
-
- // TODO finish this
- throw new UnsupportedOperationException("unfinished");
+ Text result = null;
+ if (!isNull[index]) {
+ result = new Text();
+ result.append(vector[index], start[index], length[index]);
+ }
+ return result;
}
-
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
index fc4e53b..05f80cc 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Text;
+
/**
* A class that is a growable array of bytes. Growth is managed in terms of
* chunks that are allocated when needed.
@@ -237,6 +237,7 @@ public void write(OutputStream out, int offset,
}
}
+ @Override
public String toString() {
int i;
StringBuilder sb = new StringBuilder(length * 3);
@@ -266,5 +267,30 @@ public void setByteBuffer(ByteBuffer result, int offset, int length) {
currentLength = Math.min(length, chunkSize - currentOffset);
}
}
-}
+ /**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLength = length;
+ while (totalLength > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLength -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLength, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
index 05240ce..dba9071 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
@@ -19,6 +19,8 @@
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
/**
* A row-by-row iterator for ORC files.
*/
@@ -39,6 +41,16 @@
Object next(Object previous) throws IOException;
/**
+ * Read the next row batch. The size of the batch to read cannot be controlled
+ * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred
+ * object to know the batch size read.
+ * @param previousBatch a row batch object that can be reused by the reader
+ * @return the row batch that was read
+ * @throws java.io.IOException
+ */
+ VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) throws IOException;
+
+ /**
* Get the row number of the row that will be returned by the following
* call to next().
* @return the row number from 0 to the number of rows in the file
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index d044cd8..336dfcf 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -30,6 +30,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -164,6 +169,31 @@ Object next(Object previous) throws IOException {
}
return previous;
}
+ /**
+ * Populates the isNull vector array in the previousVector object based on
+ * the present stream values. This function is called from all the child
+ * readers, and they all set the values based on isNull field value.
+ * @param previousVector The columnVector object whose isNull value is populated
+ * @param batchSize Size of the column vector
+ * @return
+ * @throws IOException
+ */
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ if (present != null) {
+
+ // Set noNulls and isNull vector of the ColumnVector based on
+ // present stream
+ ColumnVector result = (ColumnVector) previousVector;
+ result.noNulls = true;
+ for (int i = 0; i < batchSize; i++) {
+ result.isNull[i] = (present.next() != 1);
+ if (result.noNulls && result.isNull[i]) {
+ result.noNulls = false;
+ }
+ }
+ }
+ return previousVector;
+ }
}
private static class BooleanTreeReader extends TreeReader{
@@ -207,6 +237,12 @@ Object next(Object previous) throws IOException {
}
return result;
}
+
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation on Boolean type");
+ }
}
private static class ByteTreeReader extends TreeReader{
@@ -247,6 +283,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Byte type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -291,6 +333,23 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -335,6 +394,23 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -379,6 +455,23 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -423,6 +516,39 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ } else {
+
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
@@ -471,6 +597,38 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ } else {
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
stream.skip(items * 8);
@@ -531,6 +689,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Binary type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
@@ -592,6 +756,12 @@ Object next(Object previous) throws IOException {
return result;
}
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for TimeStamp type");
+ }
+
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) serialized >>> 3;
@@ -648,6 +818,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Decimal type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; i++) {
@@ -663,8 +839,11 @@ void skipRows(long items) throws IOException {
private int[] dictionaryOffsets;
private RunLengthIntegerReader reader;
+ private final LongColumnVector scratchlcv;
+
StringTreeReader(int columnId) {
super(columnId);
+ scratchlcv = new LongColumnVector();
}
@Override
@@ -725,14 +904,7 @@ Object next(Object previous) throws IOException {
result = (Text) previous;
}
int offset = dictionaryOffsets[entry];
- int length;
- // if it isn't the last entry, subtract the offsets otherwise use
- // the buffer length.
- if (entry < dictionaryOffsets.length - 1) {
- length = dictionaryOffsets[entry + 1] - offset;
- } else {
- length = dictionaryBuffer.size() - offset;
- }
+ int length = getDictionaryEntryLength(entry, offset);
// If the column is just empty strings, the size will be zero, so the buffer will be null,
// in that case just return result as it will default to empty
if (dictionaryBuffer != null) {
@@ -745,6 +917,62 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ BytesColumnVector result = null;
+ int offset = 0, length = 0;
+ if (previousVector == null) {
+ result = new BytesColumnVector();
+ } else {
+ result = (BytesColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ byte[] dictionaryBytes = dictionaryBuffer.get();
+
+ // Read string offsets
+ scratchlcv.isNull = result.isNull;
+ reader.nextVector(scratchlcv, batchSize);
+ if (!scratchlcv.isRepeating) {
+
+ // The vector has non-repeating strings. Iterate thru the batch
+ // and set strings one by one
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+ result.setRef(i, dictionaryBytes, offset, length);
+ } else {
+ // If the value is null then set offset and length to zero (null string)
+ result.setRef(i, dictionaryBytes, 0, 0);
+ }
+ }
+ } else {
+ // If the value is repeating then just set the first value in the
+ // vector and set the isRepeating flag to true. No need to iterate thru and
+ // set all the elements to the same value
+ offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+ result.setRef(0, dictionaryBytes, offset, length);
+ }
+ result.isRepeating = scratchlcv.isRepeating;
+ return result;
+ }
+
+ int getDictionaryEntryLength(int entry, int offset) {
+ int length = 0;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ return length;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -799,6 +1027,28 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ ColumnVector[] result = null;
+ if (previousVector == null) {
+ result = new ColumnVector[fields.length];
+ } else {
+ result = (ColumnVector[]) previousVector;
+ }
+
+ // Read all the members of struct as column vectors
+ for (int i = 0; i < fields.length; i++) {
+ if (fields[i] != null) {
+ if (result[i] == null) {
+ result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
+ } else {
+ fields[i].nextVector(result[i], batchSize);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
void startStripe(Map