commit bbd1a52ace2a0ca2fe0cb838bd60277d29c9e4e8 Author: Owen O'Malley Date: Wed Mar 26 16:58:00 2014 -0700 HIVE-6759. Fix reading partial orc files as they are being written. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 018452e..a6483ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -315,8 +315,15 @@ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, ) throws IOException { FSDataInputStream file = fs.open(path); + // figure out the size of the file using the option or filesystem + long size; + if (maxFileLength == Long.MAX_VALUE) { + size = fs.getFileStatus(path).getLen(); + } else { + size = maxFileLength; + } + //read last bytes into buffer to get PostScript - long size = Math.min(maxFileLength, fs.getFileStatus(path).getLen()); int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); file.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index e6e6e41..680af8f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -134,7 +134,7 @@ private static String value(OrcStruct event) { } // can add .verboseLogging() to cause Mockito to log invocations - private final MockSettings settings = Mockito.withSettings().verboseLogging(); + private final MockSettings settings = Mockito.withSettings(); private Reader createMockReader() throws IOException { Reader reader = Mockito.mock(Reader.class, settings); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java index bc21cc4..22e4724 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java @@ -23,19 +23,139 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.io.sarg.TestSearchArgumentImpl; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.junit.Test; +import org.mockito.MockSettings; +import org.mockito.Mockito; public class TestRecordReaderImpl { + // can add .verboseLogging() to cause Mockito to log invocations + private final MockSettings settings = Mockito.withSettings().verboseLogging(); + + static class BufferInStream + extends InputStream implements PositionedReadable, Seekable { + private final byte[] buffer; + private final int length; + private int position = 0; + + BufferInStream(byte[] bytes, int length) { + this.buffer = bytes; + this.length = length; + } + + @Override + public int read() { + if (position < length) { + return buffer[position++]; + } + return -1; + } + + @Override + public int read(byte[] bytes, int offset, int length) { + int lengthToRead = Math.min(length, this.length - this.position); + if (lengthToRead >= 0) { + for(int i=0; i < lengthToRead; ++i) { + bytes[offset + i] = buffer[position++]; + } + return lengthToRead; + } else { + return -1; + } + } + + @Override + public int read(long position, byte[] bytes, int offset, int length) { + this.position = (int) position; + return read(bytes, offset, length); + } + + @Override + public void readFully(long position, byte[] bytes, int offset, + int length) throws IOException { + this.position = (int) position; + while (length > 0) { + int result = read(bytes, offset, length); + offset += result; + length -= result; + if (result < 0) { + throw new IOException("Read past end of buffer at " + offset); + } + } + } + + @Override + public void readFully(long position, byte[] bytes) throws IOException { + readFully(position, bytes, 0, bytes.length); + } + + @Override + public void seek(long position) { + this.position = (int) position; + } + + @Override + public long getPos() { + return position; + } + + @Override + public boolean seekToNewSource(long position) throws IOException { + this.position = (int) position; + return false; + } + } + + @Test + public void testMaxLengthToReader() throws Exception { + Configuration conf = new Configuration(); + OrcProto.Type rowType = OrcProto.Type.newBuilder() + .setKind(OrcProto.Type.Kind.STRUCT).build(); + OrcProto.Footer footer = OrcProto.Footer.newBuilder() + .setHeaderLength(0).setContentLength(0).setNumberOfRows(0) + .setRowIndexStride(0).addTypes(rowType).build(); + OrcProto.PostScript ps = OrcProto.PostScript.newBuilder() + .setCompression(OrcProto.CompressionKind.NONE) + .setFooterLength(footer.getSerializedSize()) + .setMagic("ORC").addVersion(0).addVersion(11).build(); + DataOutputBuffer buffer = new DataOutputBuffer(); + footer.writeTo(buffer); + ps.writeTo(buffer); + buffer.write(ps.getSerializedSize()); + FileSystem fs = Mockito.mock(FileSystem.class, settings); + FSDataInputStream file = + new FSDataInputStream(new BufferInStream(buffer.getData(), + buffer.getLength())); + Path p = new Path("/dir/file.orc"); + Mockito.when(fs.open(p)).thenReturn(file); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); + options.filesystem(fs); + options.maxLength(buffer.getLength()); + Mockito.when(fs.getFileStatus(p)) + .thenReturn(new FileStatus(10, false, 3, 3000, 0, p)); + Reader reader = OrcFile.createReader(p, options); + } + @Test public void testCompareToRangeInt() throws Exception { assertEquals(Location.BEFORE, @@ -671,7 +791,7 @@ public void testPartialPlan() throws Exception { rowGroups = null; columns = new boolean[]{true, false, true}; result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes, - columns, rowGroups, false, encodings, types, 32768); + columns, null, false, encodings, types, 32768); assertThat(result, is(diskRanges(100000, 102000, 102000, 200000))); rowGroups = new boolean[]{false, true, false, false, false, false};