diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 246170d..2a45057 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -18,16 +18,17 @@ package org.apache.hadoop.hive.ql.exec.vector; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** - * This class supports string and binary data by value reference -- i.e. each field is + * This class supports string and binary data by value reference -- i.e. each field is * explicitly present, as opposed to provided by a dictionary reference. * In some cases, all the values will be in the same byte array to begin with, - * but this need not be the case. If each value is in a separate byte + * but this need not be the case. If each value is in a separate byte * array to start with, or not all of the values are in the same original * byte array, you can still assign data by reference into this column vector. - * This gives flexibility to use this in multiple situations. + * This gives flexibility to use this in multiple situations. *

* When setting data by reference, the caller * is responsible for allocating the byte arrays used to hold the data. @@ -36,23 +37,23 @@ * though that use is probably not typical. */ public class BytesColumnVector extends ColumnVector { - public byte[][] vector; + public byte[][] vector; public int[] start; // start offset of each field - + /* - * The length of each field. If the value repeats for every entry, then it is stored + * The length of each field. If the value repeats for every entry, then it is stored * in vector[0] and isRepeating from the superclass is set to true. */ - public int[] length; + public int[] length; private byte[] buffer; // optional buffer to use when actually copying in data private int nextFree; // next free position in buffer - + // Estimate that there will be 16 bytes per entry static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE; - - // Proportion of extra space to provide when allocating more buffer space. + + // Proportion of extra space to provide when allocating more buffer space. static final float EXTRA_SPACE_FACTOR = (float) 1.2; - + /** * Use this constructor for normal operation. * All column vectors should be the default size normally. @@ -60,21 +61,21 @@ public BytesColumnVector() { this(VectorizedRowBatch.DEFAULT_SIZE); } - + /** * Don't call this constructor except for testing purposes. - * + * * @param size number of elements in the column vector */ public BytesColumnVector(int size) { super(size); vector = new byte[size][]; start = new int[size]; - length = new int[size]; + length = new int[size]; } - + /** Set a field by reference. - * + * * @param elementNum index within column vector to set * @param sourceBuf container of source data * @param start start byte position within source @@ -85,37 +86,37 @@ public void setRef(int elementNum, byte[] sourceBuf, int start, int length) { this.start[elementNum] = start; this.length[elementNum] = length; } - - /** + + /** * You must call initBuffer first before using setVal(). * Provide the estimated number of bytes needed to hold * a full column vector worth of byte string data. - * + * * @param estimatedValueSize Estimated size of buffer space needed */ public void initBuffer(int estimatedValueSize) { nextFree = 0; - + // if buffer is already allocated, keep using it, don't re-allocate if (buffer != null) { return; } - + // allocate a little extra space to limit need to re-allocate int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR); if (bufferSize < DEFAULT_BUFFER_SIZE) { bufferSize = DEFAULT_BUFFER_SIZE; } - buffer = new byte[bufferSize]; + buffer = new byte[bufferSize]; } - + /** * Initialize buffer to default size. */ public void initBuffer() { initBuffer(0); } - + /** * @return amount of buffer space currently allocated */ @@ -125,13 +126,13 @@ public int bufferSize() { } return buffer.length; } - + /** * Set a field by actually copying in to a local buffer. * If you must actually copy data in to the array, use this method. * DO NOT USE this method unless it's not practical to set data by reference with setRef(). * Setting data by reference tends to run a lot faster than copying data in. - * + * * @param elementNum index within column vector to set * @param sourceBuf container of source data * @param start start byte position within source @@ -147,24 +148,24 @@ public void setVal(int elementNum, byte[] sourceBuf, int start, int length) { this.length[elementNum] = length; nextFree += length; } - + /** * Increase buffer space enough to accommodate next element. - * This uses an exponential increase mechanism to rapidly + * This uses an exponential increase mechanism to rapidly * increase buffer size to enough to hold all data. * As batches get re-loaded, buffer space allocated will quickly * stabilize. - * + * * @param nextElemLength size of next element to be added */ public void increaseBufferSpace(int nextElemLength) { - + // Keep doubling buffer size until there will be enough space for next element. - int newLength = 2 * buffer.length; + int newLength = 2 * buffer.length; while((nextFree + nextElemLength) > newLength) { newLength *= 2; } - + // Allocate new buffer, copy data to it, and set buffer to new buffer. byte[] newBuffer = new byte[newLength]; System.arraycopy(buffer, 0, newBuffer, 0, nextFree); @@ -173,9 +174,11 @@ public void increaseBufferSpace(int nextElemLength) { @Override public Writable getWritableObject(int index) { - - // TODO finish this - throw new UnsupportedOperationException("unfinished"); + Text result = null; + if (!isNull[index]) { + result = new Text(); + result.append(vector[index], start[index], length[index]); + } + return result; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java index fc4e53b..05f80cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.io.Text; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import org.apache.hadoop.io.Text; + /** * A class that is a growable array of bytes. Growth is managed in terms of * chunks that are allocated when needed. @@ -237,6 +237,7 @@ public void write(OutputStream out, int offset, } } + @Override public String toString() { int i; StringBuilder sb = new StringBuilder(length * 3); @@ -266,5 +267,30 @@ public void setByteBuffer(ByteBuffer result, int offset, int length) { currentLength = Math.min(length, chunkSize - currentOffset); } } -} + /** + * Gets all the bytes of the array. + * + * @return Bytes of the array + */ + public byte[] get() { + byte[] result = null; + if (length > 0) { + int currentChunk = 0; + int currentOffset = 0; + int currentLength = Math.min(length, chunkSize); + int destOffset = 0; + result = new byte[length]; + int totalLength = length; + while (totalLength > 0) { + System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength); + destOffset += currentLength; + totalLength -= currentLength; + currentChunk += 1; + currentOffset = 0; + currentLength = Math.min(totalLength, chunkSize - currentOffset); + } + } + return result; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java index 05240ce..dba9071 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java @@ -19,6 +19,8 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + /** * A row-by-row iterator for ORC files. */ @@ -39,6 +41,16 @@ Object next(Object previous) throws IOException; /** + * Read the next row batch. The size of the batch to read cannot be controlled + * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred + * object to know the batch size read. + * @param previousBatch a row batch object that can be reused by the reader + * @return the row batch that was read + * @throws java.io.IOException + */ + VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) throws IOException; + + /** * Get the row number of the row that will be returned by the following * call to next(). * @return the row number from 0 to the number of rows in the file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 3ce1fab..e43c1a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -30,6 +30,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -164,6 +169,31 @@ Object next(Object previous) throws IOException { } return previous; } + /** + * Populates the isNull vector array in the previousVector object based on + * the present stream values. This function is called from all the child + * readers, and they all set the values based on isNull field value. + * @param previousVector The columnVector object whose isNull value is populated + * @param batchSize Size of the column vector + * @return + * @throws IOException + */ + Object nextVector(Object previousVector, long batchSize) throws IOException { + if (present != null) { + + // Set noNulls and isNull vector of the ColumnVector based on + // present stream + ColumnVector result = (ColumnVector) previousVector; + result.noNulls = true; + for (int i = 0; i < batchSize; i++) { + result.isNull[i] = (present.next() != 1); + if (result.noNulls && result.isNull[i]) { + result.noNulls = false; + } + } + } + return previousVector; + } } private static class BooleanTreeReader extends TreeReader{ @@ -207,6 +237,12 @@ Object next(Object previous) throws IOException { } return result; } + + @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation on Boolean type"); + } } private static class ByteTreeReader extends TreeReader{ @@ -247,6 +283,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Byte type"); + } + + @Override void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } @@ -291,6 +333,23 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + LongColumnVector result = null; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + reader.nextVector(result, batchSize); + return result; + } + + @Override void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } @@ -335,6 +394,23 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + LongColumnVector result = null; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + reader.nextVector(result, batchSize); + return result; + } + + @Override void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } @@ -379,6 +455,23 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + LongColumnVector result = null; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + reader.nextVector(result, batchSize); + return result; + } + + @Override void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } @@ -423,6 +516,39 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + DoubleColumnVector result = null; + if (previousVector == null) { + result = new DoubleColumnVector(); + } else { + result = (DoubleColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + for (int i = 0; i < batchSize; i++) { + if (!result.isNull[i]) { + result.vector[i] = SerializationUtils.readDouble(stream); + } else { + + // If the value is not present then set NaN + result.vector[i] = Double.NaN; + } + } + + // Set isRepeating flag + result.isRepeating = true; + for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) { + if (result.vector[i] != result.vector[i + 1]) { + result.isRepeating = false; + } + } + return result; + } + + @Override void skipRows(long items) throws IOException { items = countNonNulls(items); for(int i=0; i < items; ++i) { @@ -471,6 +597,38 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + DoubleColumnVector result = null; + if (previousVector == null) { + result = new DoubleColumnVector(); + } else { + result = (DoubleColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + for (int i = 0; i < batchSize; i++) { + if (!result.isNull[i]) { + result.vector[i] = SerializationUtils.readDouble(stream); + } else { + // If the value is not present then set NaN + result.vector[i] = Double.NaN; + } + } + + // Set isRepeating flag + result.isRepeating = true; + for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) { + if (result.vector[i] != result.vector[i + 1]) { + result.isRepeating = false; + } + } + return result; + } + + @Override void skipRows(long items) throws IOException { items = countNonNulls(items); stream.skip(items * 8); @@ -531,6 +689,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextBatch is not supported operation for Binary type"); + } + + @Override void skipRows(long items) throws IOException { items = countNonNulls(items); long lengthToSkip = 0; @@ -592,6 +756,12 @@ Object next(Object previous) throws IOException { return result; } + @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for TimeStamp type"); + } + private static int parseNanos(long serialized) { int zeros = 7 & (int) serialized; int result = (int) serialized >>> 3; @@ -648,6 +818,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Decimal type"); + } + + @Override void skipRows(long items) throws IOException { items = countNonNulls(items); for(int i=0; i < items; i++) { @@ -663,8 +839,11 @@ void skipRows(long items) throws IOException { private int[] dictionaryOffsets; private RunLengthIntegerReader reader; + private final LongColumnVector scratchlcv; + StringTreeReader(int columnId) { super(columnId); + scratchlcv = new LongColumnVector(); } @Override @@ -725,14 +904,7 @@ Object next(Object previous) throws IOException { result = (Text) previous; } int offset = dictionaryOffsets[entry]; - int length; - // if it isn't the last entry, subtract the offsets otherwise use - // the buffer length. - if (entry < dictionaryOffsets.length - 1) { - length = dictionaryOffsets[entry + 1] - offset; - } else { - length = dictionaryBuffer.size() - offset; - } + int length = getDictionaryEntryLength(entry, offset); // If the column is just empty strings, the size will be zero, // so the buffer will be null, in that case just return result // as it will default to empty @@ -746,6 +918,62 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + BytesColumnVector result = null; + int offset = 0, length = 0; + if (previousVector == null) { + result = new BytesColumnVector(); + } else { + result = (BytesColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + byte[] dictionaryBytes = dictionaryBuffer.get(); + + // Read string offsets + scratchlcv.isNull = result.isNull; + reader.nextVector(scratchlcv, batchSize); + if (!scratchlcv.isRepeating) { + + // The vector has non-repeating strings. Iterate thru the batch + // and set strings one by one + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + offset = dictionaryOffsets[(int) scratchlcv.vector[i]]; + length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset); + result.setRef(i, dictionaryBytes, offset, length); + } else { + // If the value is null then set offset and length to zero (null string) + result.setRef(i, dictionaryBytes, 0, 0); + } + } + } else { + // If the value is repeating then just set the first value in the + // vector and set the isRepeating flag to true. No need to iterate thru and + // set all the elements to the same value + offset = dictionaryOffsets[(int) scratchlcv.vector[0]]; + length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset); + result.setRef(0, dictionaryBytes, offset, length); + } + result.isRepeating = scratchlcv.isRepeating; + return result; + } + + int getDictionaryEntryLength(int entry, int offset) { + int length = 0; + // if it isn't the last entry, subtract the offsets otherwise use + // the buffer length. + if (entry < dictionaryOffsets.length - 1) { + length = dictionaryOffsets[entry + 1] - offset; + } else { + length = dictionaryBuffer.size() - offset; + } + return length; + } + + @Override void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } @@ -807,6 +1035,28 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + ColumnVector[] result = null; + if (previousVector == null) { + result = new ColumnVector[fields.length]; + } else { + result = (ColumnVector[]) previousVector; + } + + // Read all the members of struct as column vectors + for (int i = 0; i < fields.length; i++) { + if (fields[i] != null) { + if (result[i] == null) { + result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); + } else { + fields[i].nextVector(result[i], batchSize); + } + } + } + return result; + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -874,6 +1124,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previousVector, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Union type"); + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -950,6 +1206,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previous, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for List type"); + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -1027,6 +1289,12 @@ Object next(Object previous) throws IOException { } @Override + Object nextVector(Object previous, long batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Map type"); + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -1215,6 +1483,29 @@ public Object next(Object previous) throws IOException { } @Override + public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException { + VectorizedRowBatch result = null; + if (rowInStripe >= rowCountInStripe) { + currentStripe += 1; + readStripe(); + } + + long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe)); + rowInStripe += batchSize; + if (previous == null) { + ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize); + result = new VectorizedRowBatch(cols.length); + result.cols = cols; + } else { + result = (VectorizedRowBatch) previous; + reader.nextVector(result.cols, (int) batchSize); + } + + result.size = (int) batchSize; + return result; + } + + @Override public void close() throws IOException { file.close(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index 2825c64..0b987ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -20,6 +20,8 @@ import java.io.EOFException; import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; + /** * A reader that reads a sequence of integers. * */ @@ -88,6 +90,24 @@ long next() throws IOException { return result; } + void nextVector(LongColumnVector previous, long previousLen) + throws IOException { + previous.isRepeating = true; + for (int i = 0; i < previousLen; i++) { + if (!previous.isNull[i]) { + previous.vector[i] = next(); + } else { + // The default value of null for int type in vectorized + // processing is 1, so set that if the value is null + previous.vector[i] = 1; + } + if (previous.isRepeating && (delta != 0 || !repeat)) { + previous.isRepeating = false; + } + } + } + + void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java new file mode 100644 index 0000000..f87763f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.File; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.Writable; +import org.junit.Before; +import org.junit.Test; + +/** +* +* Class that tests ORC reader vectorization by comparing records that are +* returned by "row by row" reader with batch reader. +* +*/ +public class TestVectorizedORCReader { + + private Configuration conf; + private FileSystem fs; + private Path testFilePath; + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + fs.setWorkingDirectory(workDir); + testFilePath = new Path("TestVectorizedORCReader.testDump.orc"); + fs.delete(testFilePath, false); + } + + static class MyRecord { + private final Integer i; + private final Long l; + private final Short s; + private final Double d; + private final String k; + + MyRecord(Integer i, Long l, Short s, Double d, String k) { + this.i = i; + this.l = l; + this.s = s; + this.d = d; + this.k = k; + } + } + + @Test + public void createFile() throws Exception { + ObjectInspector inspector; + synchronized (TestVectorizedORCReader.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + Random r1 = new Random(1); + String[] words = new String[] {"It", "was", "the", "best", "of", "times,", + "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age", + "of", "wisdom,", "it", "was", "the", "age", "of", "foolishness,", "it", + "was", "the", "epoch", "of", "belief,", "it", "was", "the", "epoch", + "of", "incredulity,", "it", "was", "the", "season", "of", "Light,", + "it", "was", "the", "season", "of", "Darkness,", "it", "was", "the", + "spring", "of", "hope,", "it", "was", "the", "winter", "of", "despair,", + "we", "had", "everything", "before", "us,", "we", "had", "nothing", + "before", "us,", "we", "were", "all", "going", "direct", "to", + "Heaven,", "we", "were", "all", "going", "direct", "the", "other", + "way"}; + for (int i = 0; i < 21000; ++i) { + if ((i % 3) != 0) { + writer.addRow(new MyRecord(i, (long) 200, (short) (300 + i), (double) (400 + i), + words[r1.nextInt(words.length)])); + } else { + writer.addRow(new MyRecord(i, (long) 200, null, null, null)); + } + } + writer.close(); + checkVectorizedReader(); + } + + private void checkVectorizedReader() throws Exception { + + Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath); + Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath); + RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null); + RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null); + VectorizedRowBatch batch = null; + OrcStruct row = null; + + // Check Vectorized ORC reader against ORC row reader + while (vrr.hasNext()) { + batch = vrr.nextBatch(batch); + for (int i = 0; i < batch.size; i++) { + row = (OrcStruct) rr.next((Object) row); + for (int j = 0; j < batch.cols.length; j++) { + Object a = ((Writable) row.getFieldValue(j)); + Object b = batch.cols[j].getWritableObject(i); + if (null == a) { + Assert.assertEquals(true, (b == null)); + } else { + Assert.assertEquals(true, b.toString().equals(a.toString())); + } + } + } + + // Check repeating + Assert.assertEquals(false, batch.cols[0].isRepeating); + Assert.assertEquals(true, batch.cols[1].isRepeating); + Assert.assertEquals(false, batch.cols[2].isRepeating); + Assert.assertEquals(false, batch.cols[3].isRepeating); + Assert.assertEquals(false, batch.cols[4].isRepeating); + + // Check non null + Assert.assertEquals(true, batch.cols[0].noNulls); + Assert.assertEquals(true, batch.cols[1].noNulls); + Assert.assertEquals(false, batch.cols[2].noNulls); + Assert.assertEquals(false, batch.cols[3].noNulls); + Assert.assertEquals(false, batch.cols[4].noNulls); + } + Assert.assertEquals(false, rr.hasNext()); + } +}