diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 84ee78f..1fb8161 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -523,6 +523,8 @@ HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10), + + HIVE_ORC_ZEROCOPY("hive.orc.zerocopy", false), HIVESKEWJOIN("hive.optimize.skewjoin", false), HIVECONVERTJOIN("hive.auto.convert.join", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java new file mode 100644 index 0000000..41a77b0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface DirectDecompressionCodec extends CompressionCodec { + public boolean isAvailable(); + public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index 6da3d03..cb1cdb4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -21,8 +21,13 @@ import java.io.InputStream; import java.nio.ByteBuffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + abstract class InStream extends InputStream { + private static final Log LOG = LogFactory.getLog(InStream.class); + private static class UncompressedStream extends InStream { private final String name; private final ByteBuffer[] bytes; @@ -172,7 +177,7 @@ private void readHeader() throws IOException { bufferSize + " needed = " + chunkLength); } // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always - assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream"; + assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream"; currentOffset += OutStream.HEADER_SIZE; ByteBuffer slice = this.slice(chunkLength); @@ -273,15 +278,24 @@ private ByteBuffer slice(int chunkLength) throws IOException { throw new IOException("EOF in " + this + " while trying to read " + chunkLength + " bytes"); } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)", + compressed.remaining(), len)); + } // we need to consolidate 2 or more buffers into 1 - // first clear out compressed buffers + // first copy out compressed buffers ByteBuffer copy = allocateBuffer(chunkLength); currentOffset += compressed.remaining(); len -= compressed.remaining(); copy.put(compressed); while (len > 0 && (++currentRange) < bytes.length) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); + } compressed = bytes[currentRange].duplicate(); if (compressed.remaining() >= len) { slice = compressed.slice(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index a5747a6..9de6dcc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -114,7 +114,8 @@ boolean[] includedColumns = findIncludedColumns(types, conf); String[] columnNames = getIncludedColumnNames(types, includedColumns, conf); SearchArgument sarg = createSarg(types, conf); - this.reader = file.rows(offset, length, includedColumns, sarg, columnNames); + boolean zeroCopy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY); + this.reader = file.rows(offset, length, includedColumns, sarg, columnNames, zeroCopy); this.offset = offset; this.length = length; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 2bab0ce..b6e6cf8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -185,5 +185,23 @@ RecordReader rows(long offset, long length, RecordReader rows(long offset, long length, boolean[] include, SearchArgument sarg, String[] neededColumns) throws IOException; + + /** + * Create a RecordReader that will read a section of a file. It starts reading + * at the first stripe after the offset and continues to the stripe that + * starts at offset + length. It also accepts a list of columns to read and a + * search argument. Use a zer + * @param offset the minimum offset of the first stripe to read + * @param length the distance from offset of the first address to stop reading + * at + * @param include true for each column that should be included + * @param sarg a search argument that limits the rows that should be read. + * @param neededColumns the names of the included columns + * @param zeroCopy enable zero copy reads + * @return the record reader for the rows + */ + RecordReader rows(long offset, long length, + boolean[] include, SearchArgument sarg, + String[] neededColumns, boolean zeroCopy) throws IOException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 56f25b7..f1843a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -478,6 +478,13 @@ public RecordReader rows(long offset, long length, boolean[] include public RecordReader rows(long offset, long length, boolean[] include, SearchArgument sarg, String[] columnNames ) throws IOException { + return rows(offset, length, include, sarg, columnNames, false); + } + + @Override + public RecordReader rows(long offset, long length, boolean[] include, + SearchArgument sarg, String[] columnNames, + boolean zeroCopy) throws IOException { // if included columns is null, then include all columns if (include == null) { @@ -487,7 +494,7 @@ public RecordReader rows(long offset, long length, boolean[] include, return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset, length, footer.getTypesList(), codec, bufferSize, - include, footer.getRowIndexStride(), sarg, columnNames); + include, footer.getRowIndexStride(), sarg, columnNames, zeroCopy); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 0df82b9..1ef8c54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -27,7 +27,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -49,6 +51,9 @@ import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; +import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim; +import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -56,6 +61,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import com.google.common.collect.ComparisonChain; + class RecordReaderImpl implements RecordReader { private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); @@ -87,6 +94,94 @@ private final int[] filterColumns; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; + + private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool(); + private final ZeroCopyReaderShim zcr; + + // this is an implementation copied from ElasticByteBufferPool in hadoop-2, + // which lacks a clear()/clean() operation + public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim { + private static final class Key implements Comparable { + private final int capacity; + private final long insertionTime; + + Key(int capacity, long insertionTime) { + this.capacity = capacity; + this.insertionTime = insertionTime; + } + + @Override + public int compareTo(Key other) { + return ComparisonChain.start().compare(capacity, other.capacity) + .compare(insertionTime, other.insertionTime).result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key) rhs; + return (compareTo(o) == 0); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(capacity).append(insertionTime) + .toHashCode(); + } + } + + private final TreeMap buffers = new TreeMap(); + + private final TreeMap directBuffers = new TreeMap(); + + private final TreeMap getBufferTree(boolean direct) { + return direct ? directBuffers : buffers; + } + + public void clear() { + buffers.clear(); + /*for(ByteBuffer dbuf: directBuffers.values()) { + Cleaner cleaner = ((DirectBuffer) dbuf).cleaner(); + if (cleaner != null) cleaner.clean(); + }*/ + directBuffers.clear(); + } + + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + TreeMap tree = getBufferTree(direct); + Map.Entry entry = tree.ceilingEntry(new Key(length, 0)); + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer + .allocate(length); + } + tree.remove(entry.getKey()); + return entry.getValue(); + } + + @Override + public void putBuffer(ByteBuffer buffer) { + TreeMap tree = getBufferTree(buffer.isDirect()); + while (true) { + Key key = new Key(buffer.capacity(), System.nanoTime()); + if (!tree.containsKey(key)) { + tree.put(key, buffer); + return; + } + // Buffers are indexed by (capacity, time). + // If our key is not unique on the first try, we try again, since the + // time will be different. Since we use nanoseconds, it's pretty + // unlikely that we'll loop even once, unless the system clock has a + // poor granularity. + } + } + } RecordReaderImpl(Iterable stripes, FileSystem fileSystem, @@ -98,7 +193,8 @@ boolean[] included, long strideRate, SearchArgument sarg, - String[] columnNames + String[] columnNames, + boolean zeroCopy ) throws IOException { this.file = fileSystem.open(path); this.codec = codec; @@ -129,6 +225,14 @@ } } + if (zeroCopy + && (codec == null || ((codec instanceof DirectDecompressionCodec) + && ((DirectDecompressionCodec) codec).isAvailable()))) { + this.zcr = ShimLoader.getHadoopShims().getZeroCopyReader(file, pool); + } else { + this.zcr = null; + } + firstRow = skippedRows; totalRowCount = rows; reader = createTreeReader(path, 0, types, included); @@ -2236,6 +2340,11 @@ private void clearStreams() throws IOException { is.close(); } if(bufferChunks != null) { + if(zcr != null) { + for (BufferChunk bufChunk : bufferChunks) { + zcr.releaseBuffer(bufChunk.chunk); + } + } bufferChunks.clear(); } streams.clear(); @@ -2545,10 +2654,20 @@ static void mergeDiskRanges(List ranges) { for(DiskRange range: ranges) { int len = (int) (range.end - range.offset); long off = range.offset; - file.seek(base + off); - byte[] buffer = new byte[len]; - file.readFully(buffer, 0, buffer.length); - result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); + file.seek(base + off); + if(zcr != null) { + while(len > 0) { + ByteBuffer partial = zcr.readBuffer(len, false); + result.add(new BufferChunk(partial, off)); + int read = partial.remaining(); + len -= read; + off += read; + } + } else { + byte[] buffer = new byte[len]; + file.readFully(buffer, 0, buffer.length); + result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); + } } return result; } @@ -2756,6 +2875,7 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept @Override public void close() throws IOException { clearStreams(); + pool.clear(); file.close(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java index e3131a3..bec8041 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java @@ -18,13 +18,18 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; import org.iq80.snappy.Snappy; import java.io.IOException; import java.nio.ByteBuffer; -class SnappyCodec implements CompressionCodec { +class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { + Boolean direct = null; + @Override public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow) throws IOException { @@ -57,6 +62,10 @@ public boolean compress(ByteBuffer in, ByteBuffer out, @Override public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } int inOffset = in.position(); int uncompressLen = Snappy.uncompress(in.array(), in.arrayOffset() + inOffset, @@ -64,4 +73,27 @@ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { out.position(uncompressLen + out.position()); out.flip(); } + + @Override + public boolean isAvailable() { + if (direct == null) { + // see nowrap option in new Inflater(boolean) which disables zlib headers + if (ShimLoader.getHadoopShims().getDirectDecompressor( + DirectCompressionType.SNAPPY) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims() + .getDirectDecompressor(DirectCompressionType.SNAPPY); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index d904c44..6fcda48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -63,10 +63,12 @@ boolean[] includedColumns = OrcInputFormat.findIncludedColumns(types, conf); String[] columnNames = OrcInputFormat.getIncludedColumnNames(types, includedColumns, conf); SearchArgument sarg = OrcInputFormat.createSarg(types, conf); + boolean zeroCopy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); - this.reader = file.rows(offset, length, includedColumns, sarg, columnNames); + this.reader = file.rows(offset, length, includedColumns, sarg, columnNames, zeroCopy); + try { rbCtx = new VectorizedRowBatchCtx(); rbCtx.init(conf, fileSplit); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java index a75fdea..f277a93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java @@ -23,8 +23,15 @@ import java.util.zip.Deflater; import java.util.zip.Inflater; -class ZlibCodec implements CompressionCodec { +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.hive.shims.ShimLoader; +class ZlibCodec implements CompressionCodec, DirectDecompressionCodec { + + private Boolean direct = null; + @Override public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow) throws IOException { @@ -55,6 +62,12 @@ public boolean compress(ByteBuffer in, ByteBuffer out, @Override public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + Inflater inflater = new Inflater(true); inflater.setInput(in.array(), in.arrayOffset() + in.position(), in.remaining()); @@ -74,4 +87,26 @@ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { in.position(in.limit()); } + @Override + public boolean isAvailable() { + if (direct == null) { + // see nowrap option in new Inflater(boolean) which disables zlib headers + if (ShimLoader.getHadoopShims().getDirectDecompressor( + DirectCompressionType.ZLIB_NOHEADER) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims() + .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 03fc705..491033a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -1416,6 +1416,117 @@ public void testSeek() throws Exception { } rows.close(); } + + @Test + public void testZeroCopySeek() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(200000) + .bufferSize(65536) + .rowIndexStride(1000)); + Random rand = new Random(42); + final int COUNT=32768; + long[] intValues= new long[COUNT]; + double[] doubleValues = new double[COUNT]; + String[] stringValues = new String[COUNT]; + BytesWritable[] byteValues = new BytesWritable[COUNT]; + String[] words = new String[128]; + for(int i=0; i < words.length; ++i) { + words[i] = Integer.toHexString(rand.nextInt()); + } + for(int i=0; i < COUNT/2; ++i) { + intValues[2*i] = rand.nextLong(); + intValues[2*i+1] = intValues[2*i]; + stringValues[2*i] = words[rand.nextInt(words.length)]; + stringValues[2*i+1] = stringValues[2*i]; + } + for(int i=0; i < COUNT; ++i) { + doubleValues[i] = rand.nextDouble(); + byte[] buf = new byte[20]; + rand.nextBytes(buf); + byteValues[i] = new BytesWritable(buf); + } + for(int i=0; i < COUNT; ++i) { + writer.addRow(createRandomRow(intValues, doubleValues, stringValues, + byteValues, words, i)); + } + writer.close(); + writer = null; + Reader reader = OrcFile.createReader(fs, testFilePath); + assertEquals(COUNT, reader.getNumberOfRows()); + /* enable zero copy record reader */ + RecordReader rows = reader.rows(0, Long.MAX_VALUE, null, null, null, true); + /* all tests are identical to the other seek() tests */ + OrcStruct row = null; + for(int i=COUNT-1; i >= 0; --i) { + rows.seekToRow(i); + row = (OrcStruct) rows.next(row); + BigRow expected = createRandomRow(intValues, doubleValues, + stringValues, byteValues, words, i); + assertEquals(expected.boolean1.booleanValue(), + ((BooleanWritable) row.getFieldValue(0)).get()); + assertEquals(expected.byte1.byteValue(), + ((ByteWritable) row.getFieldValue(1)).get()); + assertEquals(expected.short1.shortValue(), + ((ShortWritable) row.getFieldValue(2)).get()); + assertEquals(expected.int1.intValue(), + ((IntWritable) row.getFieldValue(3)).get()); + assertEquals(expected.long1.longValue(), + ((LongWritable) row.getFieldValue(4)).get()); + assertEquals(expected.float1.floatValue(), + ((FloatWritable) row.getFieldValue(5)).get(), 0.0001); + assertEquals(expected.double1.doubleValue(), + ((DoubleWritable) row.getFieldValue(6)).get(), 0.0001); + assertEquals(expected.bytes1, row.getFieldValue(7)); + assertEquals(expected.string1, row.getFieldValue(8)); + List expectedList = expected.middle.list; + List actualList = + (List) ((OrcStruct) row.getFieldValue(9)).getFieldValue(0); + compareList(expectedList, actualList); + compareList(expected.list, (List) row.getFieldValue(10)); + } + rows.close(); + Iterator stripeIterator = + reader.getStripes().iterator(); + long offsetOfStripe2 = 0; + long offsetOfStripe4 = 0; + long lastRowOfStripe2 = 0; + for(int i = 0; i < 5; ++i) { + StripeInformation stripe = stripeIterator.next(); + if (i < 2) { + lastRowOfStripe2 += stripe.getNumberOfRows(); + } else if (i == 2) { + offsetOfStripe2 = stripe.getOffset(); + lastRowOfStripe2 += stripe.getNumberOfRows() - 1; + } else if (i == 4) { + offsetOfStripe4 = stripe.getOffset(); + } + } + boolean[] columns = new boolean[reader.getStatistics().length]; + columns[5] = true; // long colulmn + columns[9] = true; // text column + /* enable zero copy record reader */ + rows = reader.rows(offsetOfStripe2, offsetOfStripe4 - offsetOfStripe2, + columns, null, null, true); + rows.seekToRow(lastRowOfStripe2); + for(int i = 0; i < 2; ++i) { + row = (OrcStruct) rows.next(row); + BigRow expected = createRandomRow(intValues, doubleValues, + stringValues, byteValues, words, + (int) (lastRowOfStripe2 + i)); + + assertEquals(expected.long1.longValue(), + ((LongWritable) row.getFieldValue(4)).get()); + assertEquals(expected.string1, row.getFieldValue(8)); + } + rows.close(); + } private void compareInner(InnerStruct expect, OrcStruct actual) throws Exception {