diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 86f1a71..d3cb086 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -534,6 +534,8 @@
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
+ HIVE_ORC_ZEROCOPY("hive.exec.orc.zerocopy", false),
+
HIVESKEWJOIN("hive.optimize.skewjoin", false),
HIVECONVERTJOIN("hive.auto.convert.join", true),
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 8c64633..d07ae48 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2342,6 +2342,14 @@
+ hive.exec.orc.zerocopy.
+ false
+
+ Use zerocopy reads with ORC.
+
+
+
+
hive.jar.directory
hdfs:///user/hive/
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java
new file mode 100644
index 0000000..41a77b0
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface DirectDecompressionCodec extends CompressionCodec {
+ public boolean isAvailable();
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
index 6da3d03..74ba971 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
@@ -21,8 +21,13 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
abstract class InStream extends InputStream {
+ private static final Log LOG = LogFactory.getLog(InStream.class);
+
private static class UncompressedStream extends InStream {
private final String name;
private final ByteBuffer[] bytes;
@@ -172,7 +177,7 @@ private void readHeader() throws IOException {
bufferSize + " needed = " + chunkLength);
}
// read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
- assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+ assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
currentOffset += OutStream.HEADER_SIZE;
ByteBuffer slice = this.slice(chunkLength);
@@ -274,14 +279,23 @@ private ByteBuffer slice(int chunkLength) throws IOException {
chunkLength + " bytes");
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+ compressed.remaining(), len));
+ }
+
// we need to consolidate 2 or more buffers into 1
- // first clear out compressed buffers
+ // first copy out compressed buffers
ByteBuffer copy = allocateBuffer(chunkLength);
currentOffset += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
while (len > 0 && (++currentRange) < bytes.length) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+ }
compressed = bytes[currentRange].duplicate();
if (compressed.remaining() >= len) {
slice = compressed.slice();
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index b9b4651..b699b16 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -160,7 +160,7 @@ static RecordReader createReaderFromFile(
conf);
SearchArgument sarg = createSarg(types, conf);
RecordReader reader =
- file.rows(offset, length, includedColumns, sarg, columnNames);
+ file.rows(offset, length, includedColumns, sarg, columnNames, conf);
return reader;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 2bab0ce..ed656cb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -186,4 +187,22 @@ RecordReader rows(long offset, long length,
boolean[] include, SearchArgument sarg,
String[] neededColumns) throws IOException;
+ /**
+ * Create a RecordReader that will read a section of a file. It starts reading
+ * at the first stripe after the offset and continues to the stripe that
+ * starts at offset + length. It also accepts a list of columns to read and a
+ * search argument. Use a zer
+ * @param offset the minimum offset of the first stripe to read
+ * @param length the distance from offset of the first address to stop reading
+ * at
+ * @param include true for each column that should be included
+ * @param sarg a search argument that limits the rows that should be read.
+ * @param neededColumns the names of the included columns
+ * @param Configuration configuration for the job
+ * @return the record reader for the rows
+ */
+ RecordReader rows(long offset, long length,
+ boolean[] include, SearchArgument sarg,
+ String[] neededColumns, Configuration conf) throws IOException;
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 56f25b7..79e485a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -478,6 +479,13 @@ public RecordReader rows(long offset, long length, boolean[] include
public RecordReader rows(long offset, long length, boolean[] include,
SearchArgument sarg, String[] columnNames
) throws IOException {
+ return rows(offset, length, include, sarg, columnNames, null);
+ }
+
+ @Override
+ public RecordReader rows(long offset, long length, boolean[] include,
+ SearchArgument sarg, String[] columnNames,
+ Configuration conf) throws IOException {
// if included columns is null, then include all columns
if (include == null) {
@@ -487,7 +495,7 @@ public RecordReader rows(long offset, long length, boolean[] include,
return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset,
length, footer.getTypesList(), codec, bufferSize,
- include, footer.getRowIndexStride(), sarg, columnNames);
+ include, footer.getRowIndexStride(), sarg, columnNames, conf);
}
@Override
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index c3c9685..377165e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -28,13 +28,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -46,6 +54,9 @@
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -53,6 +64,8 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import com.google.common.collect.ComparisonChain;
+
class RecordReaderImpl implements RecordReader {
private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
@@ -85,6 +98,89 @@
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
+ private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
+ private final ZeroCopyReaderShim zcr;
+
+ // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
+ // which lacks a clear()/clean() operation
+ public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
+ private static final class Key implements Comparable {
+ private final int capacity;
+ private final long insertionGeneration;
+
+ Key(int capacity, long insertionGeneration) {
+ this.capacity = capacity;
+ this.insertionGeneration = insertionGeneration;
+ }
+
+ @Override
+ public int compareTo(Key other) {
+ return ComparisonChain.start().compare(capacity, other.capacity)
+ .compare(insertionGeneration, other.insertionGeneration).result();
+ }
+
+ @Override
+ public boolean equals(Object rhs) {
+ if (rhs == null) {
+ return false;
+ }
+ try {
+ Key o = (Key) rhs;
+ return (compareTo(o) == 0);
+ } catch (ClassCastException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(capacity).append(insertionGeneration)
+ .toHashCode();
+ }
+ }
+
+ private final TreeMap buffers = new TreeMap();
+
+ private final TreeMap directBuffers = new TreeMap();
+
+ private long currentGeneration = 0;
+
+ private final TreeMap getBufferTree(boolean direct) {
+ return direct ? directBuffers : buffers;
+ }
+
+ public void clear() {
+ buffers.clear();
+ directBuffers.clear();
+ }
+
+ @Override
+ public ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap tree = getBufferTree(direct);
+ Map.Entry entry = tree.ceilingEntry(new Key(length, 0));
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
+ .allocate(length);
+ }
+ tree.remove(entry.getKey());
+ return entry.getValue();
+ }
+
+ @Override
+ public void putBuffer(ByteBuffer buffer) {
+ TreeMap tree = getBufferTree(buffer.isDirect());
+ while (true) {
+ Key key = new Key(buffer.capacity(), currentGeneration++);
+ if (!tree.containsKey(key)) {
+ tree.put(key, buffer);
+ return;
+ }
+ // Buffers are indexed by (capacity, generation).
+ // If our key is not unique on the first try, we try again
+ }
+ }
+ }
+
RecordReaderImpl(Iterable stripes,
FileSystem fileSystem,
Path path,
@@ -95,7 +191,8 @@
boolean[] included,
long strideRate,
SearchArgument sarg,
- String[] columnNames
+ String[] columnNames,
+ Configuration conf
) throws IOException {
this.file = fileSystem.open(path);
this.codec = codec;
@@ -126,6 +223,17 @@
}
}
+ final boolean zeroCopy = (conf != null)
+ && (HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_ZEROCOPY));
+
+ if (zeroCopy
+ && (codec == null || ((codec instanceof DirectDecompressionCodec)
+ && ((DirectDecompressionCodec) codec).isAvailable()))) {
+ this.zcr = ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
+ } else {
+ this.zcr = null;
+ }
+
firstRow = skippedRows;
totalRowCount = rows;
reader = createTreeReader(path, 0, types, included);
@@ -2276,6 +2384,11 @@ private void clearStreams() throws IOException {
is.close();
}
if(bufferChunks != null) {
+ if(zcr != null) {
+ for (BufferChunk bufChunk : bufferChunks) {
+ zcr.releaseBuffer(bufChunk.chunk);
+ }
+ }
bufferChunks.clear();
}
streams.clear();
@@ -2592,10 +2705,20 @@ static void mergeDiskRanges(List ranges) {
for(DiskRange range: ranges) {
int len = (int) (range.end - range.offset);
long off = range.offset;
- file.seek(base + off);
- byte[] buffer = new byte[len];
- file.readFully(buffer, 0, buffer.length);
- result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+ file.seek(base + off);
+ if(zcr != null) {
+ while(len > 0) {
+ ByteBuffer partial = zcr.readBuffer(len, false);
+ result.add(new BufferChunk(partial, off));
+ int read = partial.remaining();
+ len -= read;
+ off += read;
+ }
+ } else {
+ byte[] buffer = new byte[len];
+ file.readFully(buffer, 0, buffer.length);
+ result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+ }
}
return result;
}
@@ -2833,6 +2956,7 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept
@Override
public void close() throws IOException {
clearStreams();
+ pool.clear();
file.close();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
index e3131a3..468fec9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
@@ -18,12 +18,17 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
import org.iq80.snappy.Snappy;
import java.io.IOException;
import java.nio.ByteBuffer;
-class SnappyCodec implements CompressionCodec {
+class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
+
+ Boolean direct = null;
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
@@ -57,6 +62,10 @@ public boolean compress(ByteBuffer in, ByteBuffer out,
@Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ if(in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
int inOffset = in.position();
int uncompressLen =
Snappy.uncompress(in.array(), in.arrayOffset() + inOffset,
@@ -64,4 +73,26 @@ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
out.position(uncompressLen + out.position());
out.flip();
}
+
+ @Override
+ public boolean isAvailable() {
+ if (direct == null) {
+ if (ShimLoader.getHadoopShims().getDirectDecompressor(
+ DirectCompressionType.SNAPPY) != null) {
+ direct = Boolean.valueOf(true);
+ } else {
+ direct = Boolean.valueOf(false);
+ }
+ }
+ return direct.booleanValue();
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out)
+ throws IOException {
+ DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
+ .getDirectDecompressor(DirectCompressionType.SNAPPY);
+ decompressShim.decompress(in, out);
+ out.flip(); // flip for read
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index d904c44..507aa83 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -66,7 +66,8 @@
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
- this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
+ this.reader = file.rows(offset, length, includedColumns, sarg, columnNames, conf);
+
try {
rbCtx = new VectorizedRowBatchCtx();
rbCtx.init(conf, fileSplit);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
index a75fdea..d0dda34 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
@@ -23,7 +23,14 @@
import java.util.zip.Deflater;
import java.util.zip.Inflater;
-class ZlibCodec implements CompressionCodec {
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
+
+ private Boolean direct = null;
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
@@ -55,6 +62,12 @@ public boolean compress(ByteBuffer in, ByteBuffer out,
@Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+
+ if(in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
+
Inflater inflater = new Inflater(true);
inflater.setInput(in.array(), in.arrayOffset() + in.position(),
in.remaining());
@@ -74,4 +87,26 @@ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
in.position(in.limit());
}
+ @Override
+ public boolean isAvailable() {
+ if (direct == null) {
+ // see nowrap option in new Inflater(boolean) which disables zlib headers
+ if (ShimLoader.getHadoopShims().getDirectDecompressor(
+ DirectCompressionType.ZLIB_NOHEADER) != null) {
+ direct = Boolean.valueOf(true);
+ } else {
+ direct = Boolean.valueOf(false);
+ }
+ }
+ return direct.booleanValue();
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out)
+ throws IOException {
+ DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
+ .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER);
+ decompressShim.decompress(in, out);
+ out.flip(); // flip for read
+ }
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index 03fc705..fb4e77a 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -39,6 +39,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -1417,6 +1419,119 @@ public void testSeek() throws Exception {
rows.close();
}
+ @Test
+ public void testZeroCopySeek() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .inspector(inspector)
+ .stripeSize(200000)
+ .bufferSize(65536)
+ .rowIndexStride(1000));
+ Random rand = new Random(42);
+ final int COUNT=32768;
+ long[] intValues= new long[COUNT];
+ double[] doubleValues = new double[COUNT];
+ String[] stringValues = new String[COUNT];
+ BytesWritable[] byteValues = new BytesWritable[COUNT];
+ String[] words = new String[128];
+ for(int i=0; i < words.length; ++i) {
+ words[i] = Integer.toHexString(rand.nextInt());
+ }
+ for(int i=0; i < COUNT/2; ++i) {
+ intValues[2*i] = rand.nextLong();
+ intValues[2*i+1] = intValues[2*i];
+ stringValues[2*i] = words[rand.nextInt(words.length)];
+ stringValues[2*i+1] = stringValues[2*i];
+ }
+ for(int i=0; i < COUNT; ++i) {
+ doubleValues[i] = rand.nextDouble();
+ byte[] buf = new byte[20];
+ rand.nextBytes(buf);
+ byteValues[i] = new BytesWritable(buf);
+ }
+ for(int i=0; i < COUNT; ++i) {
+ writer.addRow(createRandomRow(intValues, doubleValues, stringValues,
+ byteValues, words, i));
+ }
+ writer.close();
+ writer = null;
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ assertEquals(COUNT, reader.getNumberOfRows());
+ /* enable zero copy record reader */
+ Configuration conf = new Configuration();
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_ORC_ZEROCOPY, true);
+ RecordReader rows = reader.rows(0, Long.MAX_VALUE, null, null, null, conf);
+ /* all tests are identical to the other seek() tests */
+ OrcStruct row = null;
+ for(int i=COUNT-1; i >= 0; --i) {
+ rows.seekToRow(i);
+ row = (OrcStruct) rows.next(row);
+ BigRow expected = createRandomRow(intValues, doubleValues,
+ stringValues, byteValues, words, i);
+ assertEquals(expected.boolean1.booleanValue(),
+ ((BooleanWritable) row.getFieldValue(0)).get());
+ assertEquals(expected.byte1.byteValue(),
+ ((ByteWritable) row.getFieldValue(1)).get());
+ assertEquals(expected.short1.shortValue(),
+ ((ShortWritable) row.getFieldValue(2)).get());
+ assertEquals(expected.int1.intValue(),
+ ((IntWritable) row.getFieldValue(3)).get());
+ assertEquals(expected.long1.longValue(),
+ ((LongWritable) row.getFieldValue(4)).get());
+ assertEquals(expected.float1.floatValue(),
+ ((FloatWritable) row.getFieldValue(5)).get(), 0.0001);
+ assertEquals(expected.double1.doubleValue(),
+ ((DoubleWritable) row.getFieldValue(6)).get(), 0.0001);
+ assertEquals(expected.bytes1, row.getFieldValue(7));
+ assertEquals(expected.string1, row.getFieldValue(8));
+ List expectedList = expected.middle.list;
+ List actualList =
+ (List) ((OrcStruct) row.getFieldValue(9)).getFieldValue(0);
+ compareList(expectedList, actualList);
+ compareList(expected.list, (List) row.getFieldValue(10));
+ }
+ rows.close();
+ Iterator stripeIterator =
+ reader.getStripes().iterator();
+ long offsetOfStripe2 = 0;
+ long offsetOfStripe4 = 0;
+ long lastRowOfStripe2 = 0;
+ for(int i = 0; i < 5; ++i) {
+ StripeInformation stripe = stripeIterator.next();
+ if (i < 2) {
+ lastRowOfStripe2 += stripe.getNumberOfRows();
+ } else if (i == 2) {
+ offsetOfStripe2 = stripe.getOffset();
+ lastRowOfStripe2 += stripe.getNumberOfRows() - 1;
+ } else if (i == 4) {
+ offsetOfStripe4 = stripe.getOffset();
+ }
+ }
+ boolean[] columns = new boolean[reader.getStatistics().length];
+ columns[5] = true; // long colulmn
+ columns[9] = true; // text column
+ /* use zero copy record reader */
+ rows = reader.rows(offsetOfStripe2, offsetOfStripe4 - offsetOfStripe2,
+ columns, null, null, conf);
+ rows.seekToRow(lastRowOfStripe2);
+ for(int i = 0; i < 2; ++i) {
+ row = (OrcStruct) rows.next(row);
+ BigRow expected = createRandomRow(intValues, doubleValues,
+ stringValues, byteValues, words,
+ (int) (lastRowOfStripe2 + i));
+
+ assertEquals(expected.long1.longValue(),
+ ((LongWritable) row.getFieldValue(4)).get());
+ assertEquals(expected.string1, row.getFieldValue(8));
+ }
+ rows.close();
+ }
+
private void compareInner(InnerStruct expect,
OrcStruct actual) throws Exception {
if (expect == null || actual == null) {