diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 246170d..2a45057 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -18,16 +18,17 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
- * This class supports string and binary data by value reference -- i.e. each field is
+ * This class supports string and binary data by value reference -- i.e. each field is
* explicitly present, as opposed to provided by a dictionary reference.
* In some cases, all the values will be in the same byte array to begin with,
- * but this need not be the case. If each value is in a separate byte
+ * but this need not be the case. If each value is in a separate byte
* array to start with, or not all of the values are in the same original
* byte array, you can still assign data by reference into this column vector.
- * This gives flexibility to use this in multiple situations.
+ * This gives flexibility to use this in multiple situations.
*
* When setting data by reference, the caller
* is responsible for allocating the byte arrays used to hold the data.
@@ -36,23 +37,23 @@
* though that use is probably not typical.
*/
public class BytesColumnVector extends ColumnVector {
- public byte[][] vector;
+ public byte[][] vector;
public int[] start; // start offset of each field
-
+
/*
- * The length of each field. If the value repeats for every entry, then it is stored
+ * The length of each field. If the value repeats for every entry, then it is stored
* in vector[0] and isRepeating from the superclass is set to true.
*/
- public int[] length;
+ public int[] length;
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
-
+
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
-
- // Proportion of extra space to provide when allocating more buffer space.
+
+ // Proportion of extra space to provide when allocating more buffer space.
static final float EXTRA_SPACE_FACTOR = (float) 1.2;
-
+
/**
* Use this constructor for normal operation.
* All column vectors should be the default size normally.
@@ -60,21 +61,21 @@
public BytesColumnVector() {
this(VectorizedRowBatch.DEFAULT_SIZE);
}
-
+
/**
* Don't call this constructor except for testing purposes.
- *
+ *
* @param size number of elements in the column vector
*/
public BytesColumnVector(int size) {
super(size);
vector = new byte[size][];
start = new int[size];
- length = new int[size];
+ length = new int[size];
}
-
+
/** Set a field by reference.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -85,37 +86,37 @@ public void setRef(int elementNum, byte[] sourceBuf, int start, int length) {
this.start[elementNum] = start;
this.length[elementNum] = length;
}
-
- /**
+
+ /**
* You must call initBuffer first before using setVal().
* Provide the estimated number of bytes needed to hold
* a full column vector worth of byte string data.
- *
+ *
* @param estimatedValueSize Estimated size of buffer space needed
*/
public void initBuffer(int estimatedValueSize) {
nextFree = 0;
-
+
// if buffer is already allocated, keep using it, don't re-allocate
if (buffer != null) {
return;
}
-
+
// allocate a little extra space to limit need to re-allocate
int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
if (bufferSize < DEFAULT_BUFFER_SIZE) {
bufferSize = DEFAULT_BUFFER_SIZE;
}
- buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
}
-
+
/**
* Initialize buffer to default size.
*/
public void initBuffer() {
initBuffer(0);
}
-
+
/**
* @return amount of buffer space currently allocated
*/
@@ -125,13 +126,13 @@ public int bufferSize() {
}
return buffer.length;
}
-
+
/**
* Set a field by actually copying in to a local buffer.
* If you must actually copy data in to the array, use this method.
* DO NOT USE this method unless it's not practical to set data by reference with setRef().
* Setting data by reference tends to run a lot faster than copying data in.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -147,24 +148,24 @@ public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
this.length[elementNum] = length;
nextFree += length;
}
-
+
/**
* Increase buffer space enough to accommodate next element.
- * This uses an exponential increase mechanism to rapidly
+ * This uses an exponential increase mechanism to rapidly
* increase buffer size to enough to hold all data.
* As batches get re-loaded, buffer space allocated will quickly
* stabilize.
- *
+ *
* @param nextElemLength size of next element to be added
*/
public void increaseBufferSpace(int nextElemLength) {
-
+
// Keep doubling buffer size until there will be enough space for next element.
- int newLength = 2 * buffer.length;
+ int newLength = 2 * buffer.length;
while((nextFree + nextElemLength) > newLength) {
newLength *= 2;
}
-
+
// Allocate new buffer, copy data to it, and set buffer to new buffer.
byte[] newBuffer = new byte[newLength];
System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
@@ -173,9 +174,11 @@ public void increaseBufferSpace(int nextElemLength) {
@Override
public Writable getWritableObject(int index) {
-
- // TODO finish this
- throw new UnsupportedOperationException("unfinished");
+ Text result = null;
+ if (!isNull[index]) {
+ result = new Text();
+ result.append(vector[index], start[index], length[index]);
+ }
+ return result;
}
-
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
index fc4e53b..617aff8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Text;
+
/**
* A class that is a growable array of bytes. Growth is managed in terms of
* chunks that are allocated when needed.
@@ -237,6 +237,7 @@ public void write(OutputStream out, int offset,
}
}
+ @Override
public String toString() {
int i;
StringBuilder sb = new StringBuilder(length * 3);
@@ -266,5 +267,30 @@ public void setByteBuffer(ByteBuffer result, int offset, int length) {
currentLength = Math.min(length, chunkSize - currentOffset);
}
}
-}
+ /**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLenght = length;
+ while (totalLenght > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLenght -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLenght, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
index 05240ce..e976773 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
@@ -39,6 +39,14 @@
Object next(Object previous) throws IOException;
/**
+ * Read the next row batch.
+ * @param previous a row batch object that can be reused by the reader
+ * @return the row batch that was read
+ * @throws java.io.IOException
+ */
+ Object nextBatch(Object previous) throws IOException;
+
+ /**
* Get the row number of the row that will be returned by the following
* call to next().
* @return the row number from 0 to the number of rows in the file
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index d044cd8..9f9163f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -30,6 +30,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -114,8 +119,17 @@ public long getNext() {
private BitFieldReader present = null;
protected boolean valuePresent = false;
+ /*
+ * The array that is used in vectorized batch processing and specifies if a value
+ * is null for a given row.
+ */
+ protected boolean[] isNull;
+ // Specifies if isNull array has some nulls or not.
+ protected boolean noNulls;
+
TreeReader(int columnId) {
this.columnId = columnId;
+ isNull = new boolean[VectorizedRowBatch.DEFAULT_SIZE];
}
void startStripe(Map streams,
@@ -164,6 +178,19 @@ Object next(Object previous) throws IOException {
}
return previous;
}
+
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ if (present != null) {
+ noNulls = false;
+ for (int i = 0; i < batchSize; i++) {
+ isNull[i] = present.next() != 1;
+ if (!noNulls && !isNull[i]) {
+ noNulls = true;
+ }
+ }
+ }
+ return previous;
+ }
}
private static class BooleanTreeReader extends TreeReader{
@@ -207,6 +234,12 @@ Object next(Object previous) throws IOException {
}
return result;
}
+
+ @Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation on Boolean type");
+ }
}
private static class ByteTreeReader extends TreeReader{
@@ -247,6 +280,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Byte type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -291,6 +330,22 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ LongColumnVector result = null;
+ if (previous == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previous;
+ }
+
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ reader.nextBatch(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -335,6 +390,21 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ LongColumnVector result = null;
+ if (previous == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previous;
+ }
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ reader.nextBatch(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -379,6 +449,22 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ LongColumnVector result = null;
+ if (previous == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previous;
+ }
+
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ reader.nextBatch(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -423,6 +509,35 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ DoubleColumnVector result = null;
+ if (previous == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previous;
+ }
+
+ // Read entries
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ for (int i = 0; i < batchSize; i++) {
+ if (!this.isNull[i]) {
+ result.vector[i] = SerializationUtils.readFloat(stream);
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
@@ -471,6 +586,35 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ DoubleColumnVector result = null;
+ if (previous == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previous;
+ }
+
+ // Read entries
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ for (int i = 0; i < batchSize; i++) {
+ if (!this.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
stream.skip(items * 8);
@@ -531,6 +675,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Binary type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
@@ -592,6 +742,12 @@ Object next(Object previous) throws IOException {
return result;
}
+ @Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for TimeStamp type");
+ }
+
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) serialized >>> 3;
@@ -648,6 +804,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Decimal type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; i++) {
@@ -663,8 +825,12 @@ void skipRows(long items) throws IOException {
private int[] dictionaryOffsets;
private RunLengthIntegerReader reader;
+ private final LongColumnVector scratchlcv;
+
StringTreeReader(int columnId) {
super(columnId);
+ scratchlcv = new LongColumnVector();
+ scratchlcv.isNull = isNull;
}
@Override
@@ -725,14 +891,7 @@ Object next(Object previous) throws IOException {
result = (Text) previous;
}
int offset = dictionaryOffsets[entry];
- int length;
- // if it isn't the last entry, subtract the offsets otherwise use
- // the buffer length.
- if (entry < dictionaryOffsets.length - 1) {
- length = dictionaryOffsets[entry + 1] - offset;
- } else {
- length = dictionaryBuffer.size() - offset;
- }
+ int length = getDictionaryEntryLength(entry, offset);
// If the column is just empty strings, the size will be zero, so the buffer will be null,
// in that case just return result as it will default to empty
if (dictionaryBuffer != null) {
@@ -745,6 +904,54 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ super.nextBatch(previous, batchSize);
+ BytesColumnVector result = null;
+ int offset = 0, length = 0;
+ if (previous == null) {
+ result = new BytesColumnVector();
+ } else {
+ result = (BytesColumnVector) previous;
+ }
+
+ byte[] dictionaryBytes = dictionaryBuffer.get();
+ // Read offsets
+ reader.nextBatch(scratchlcv, batchSize);
+ if (!scratchlcv.isRepeating) {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+ result.setRef(i, dictionaryBytes, offset, length);
+ }
+ }
+ } else {
+ // If the value is repeating then just set the first value in the
+ // vector and set the isRepeating flag to true. No need to iterate thru and
+ // set all the elements to the same value
+ offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+ result.setRef(0, dictionaryBytes, offset, length);
+ }
+ result.isRepeating = scratchlcv.isRepeating;
+ result.isNull = isNull;
+ result.noNulls = noNulls;
+ return result;
+ }
+
+ int getDictionaryEntryLength(int entry, int offset) {
+ int length = 0;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ return length;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -799,6 +1006,28 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ ColumnVector[] result = null;
+ super.next(previous);
+ if (previous == null) {
+ result = new ColumnVector[fields.length];
+ } else {
+ result = (ColumnVector[]) previous;
+ }
+
+ for (int i = 0; i < fields.length; i++) {
+ if (fields[i] != null) {
+ if (result[i] == null) {
+ result[i] = (ColumnVector) fields[i].nextBatch(null, batchSize);
+ } else {
+ fields[i].nextBatch(result[i], batchSize);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
@@ -866,6 +1095,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Union type");
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
@@ -942,6 +1177,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for List type");
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
@@ -1019,6 +1260,12 @@ Object next(Object previous) throws IOException {
}
@Override
+ Object nextBatch(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Map type");
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
@@ -1207,6 +1454,29 @@ public Object next(Object previous) throws IOException {
}
@Override
+ public Object nextBatch(Object previous) throws IOException {
+ VectorizedRowBatch result = null;
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ readStripe();
+ }
+
+ long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
+ rowInStripe += batchSize;
+ if (previous == null) {
+ ColumnVector[] cols = (ColumnVector[]) reader.nextBatch(null, (int) batchSize);
+ result = new VectorizedRowBatch(cols.length);
+ result.cols = cols;
+ } else {
+ result = (VectorizedRowBatch) previous;
+ reader.nextBatch(result.cols, (int) batchSize);
+ }
+
+ result.size = (int) batchSize;
+ return result;
+ }
+
+ @Override
public void close() throws IOException {
file.close();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
index 2825c64..bf92a25 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
@@ -20,6 +20,8 @@
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
/**
* A reader that reads a sequence of integers.
* */
@@ -88,6 +90,20 @@ long next() throws IOException {
return result;
}
+ void nextBatch(LongColumnVector previous, long previousLen)
+ throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ }
+ if (previous.isRepeating && (delta != 0 || !repeat)) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+
void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
new file mode 100644
index 0000000..01a22da
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.File;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestVectorizedORCReader {
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ fs.setWorkingDirectory(workDir);
+ testFilePath = new Path("TestVectorizedORCReader.testDump.orc");
+ fs.delete(testFilePath, false);
+ }
+
+ static class MyRecord {
+ int i;
+ long l;
+ short s;
+ double d;
+ String k;
+
+ MyRecord(int i, int l, short s, double d, String k) {
+ this.i = i;
+ this.l = l;
+ this.s = s;
+ this.d = d;
+ this.k = k;
+ }
+ }
+
+ @Test
+ public void createFile() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestVectorizedORCReader.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ Random r1 = new Random(1);
+ String[] words = new String[] {"It", "was", "the", "best", "of", "times,",
+ "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
+ "of", "wisdom,", "it", "was", "the", "age", "of", "foolishness,", "it",
+ "was", "the", "epoch", "of", "belief,", "it", "was", "the", "epoch",
+ "of", "incredulity,", "it", "was", "the", "season", "of", "Light,",
+ "it", "was", "the", "season", "of", "Darkness,", "it", "was", "the",
+ "spring", "of", "hope,", "it", "was", "the", "winter", "of", "despair,",
+ "we", "had", "everything", "before", "us,", "we", "had", "nothing",
+ "before", "us,", "we", "were", "all", "going", "direct", "to",
+ "Heaven,", "we", "were", "all", "going", "direct", "the", "other",
+ "way"};
+ for (int i = 0; i < 21000; ++i) {
+ writer.addRow(new MyRecord(i, 200, (short) (300 + i), (double) (400 + i), words[r1
+ .nextInt(words.length)]));
+ }
+ writer.close();
+ checkVectorizedReader();
+ }
+
+ private void checkVectorizedReader() throws Exception {
+
+ Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+ Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+ RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null);
+ RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null);
+ VectorizedRowBatch batch = null;
+ OrcStruct row = null;
+ // Check Vectorized ORC reader against ORC row reader
+ while (vrr.hasNext()) {
+ batch = (VectorizedRowBatch) vrr.nextBatch((Object) batch);
+ for (int i = 0; i < batch.size; i++) {
+ row = (OrcStruct) rr.next((Object) row);
+ for (int j = 0; j < batch.cols.length; j++) {
+ String a = ((Writable) row.getFieldValue(j)).toString();
+ String b = batch.cols[j].getWritableObject(i).toString();
+ Assert.assertEquals(true, b.equals(a));
+ }
+ }
+ // Check repeating
+ Assert.assertEquals(false, batch.cols[0].isRepeating);
+ Assert.assertEquals(true, batch.cols[1].isRepeating);
+ Assert.assertEquals(false, batch.cols[2].isRepeating);
+ Assert.assertEquals(false, batch.cols[3].isRepeating);
+ Assert.assertEquals(false, batch.cols[4].isRepeating);
+
+ // Check non null
+ Assert.assertEquals(true, batch.cols[0].noNulls);
+ Assert.assertEquals(true, batch.cols[1].noNulls);
+ Assert.assertEquals(true, batch.cols[2].noNulls);
+ Assert.assertEquals(true, batch.cols[3].noNulls);
+ Assert.assertEquals(true, batch.cols[4].noNulls);
+ }
+ Assert.assertEquals(false, rr.hasNext());
+ }
+}