diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java index cdc0372..7977d64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java @@ -82,8 +82,7 @@ public MetadataReader(FSDataInputStream file, } if ((included == null || included[col]) && indexes[col] == null) { byte[] buffer = new byte[len]; - file.seek(offset); - file.readFully(buffer); + OrcUtils.readFromStream(file, offset, buffer, 0, buffer.length); ByteBuffer[] bb = new ByteBuffer[] {ByteBuffer.wrap(buffer)}; indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", bb, new long[]{0}, stream.getLength(), codec, bufferSize)); @@ -108,8 +107,7 @@ public MetadataReader(FSDataInputStream file, // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); - file.seek(offset); - file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); + OrcUtils.readFromStream(file, offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); return OrcProto.StripeFooter.parseFrom(InStream.create("footer", Lists.newArrayList(new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index db2ca15..2e256a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.io.EOFException; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -24,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -204,4 +207,38 @@ public static int getFlattenedColumnsCount(ObjectInspector inspector) { return numWriters; } + /** + * Read bytes from the given position in the stream to the given buffer. + * Continues to read until length bytes have been read. + * + * @param file Stream to read from + * @param position position in the input stream to seek + * @param buffer buffer into which data is read + * @param offset offset into the buffer in which data is written + * @param length the number of bytes to read + * @throws EOFException If the end of stream is reached while reading. + * If an exception is thrown an undetermined number + * of bytes in the buffer may have been written. + */ + public static void readFromStream(FSDataInputStream file, long position, + byte[] buffer, int offset, int length) throws IOException { + /** + * HIVE-11945. readFully approach has additional seek in S3. + */ + if (file.getWrappedStream().getClass().getName() + .equalsIgnoreCase("org.apache.hadoop.fs.s3a.S3AInputStream")) { + file.seek(position); + int nread = 0; + while (nread < length) { + int nbytes = file.read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); + } + nread += nbytes; + } + } else { + file.readFully(position, buffer, offset, length); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index ab539c4..68ad356 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -244,9 +244,8 @@ static void ensureOrcFooter(FSDataInputStream in, if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) { // If it isn't there, this may be the 0.11.0 version of ORC. // Read the first 3 bytes of the file to check for the header - in.seek(0); byte[] header = new byte[len]; - in.readFully(header, 0, len); + OrcUtils.readFromStream(in, 0, header, 0, len); // if it isn't there, this isn't an ORC file if (!Text.decode(header, 0 , len).equals(OrcFile.MAGIC)) { throw new FileFormatException("Malformed ORC file " + path + @@ -472,10 +471,10 @@ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, //read last bytes into buffer to get PostScript int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); - file.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); assert buffer.position() == 0; - file.readFully(buffer.array(), buffer.arrayOffset(), readSize); + OrcUtils.readFromStream(file, (size - readSize), + buffer.array(), buffer.arrayOffset(), readSize); buffer.position(0); //read the PostScript @@ -495,10 +494,9 @@ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); if (extra > 0) { //more bytes need to be read, seek back to the right place and read extra bytes - file.seek(size - readSize - extra); ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); - file.readFully(extraBuf.array(), - extraBuf.arrayOffset() + extraBuf.position(), extra); + OrcUtils.readFromStream(file, (size - readSize - extra), extraBuf.array(), + extraBuf.arrayOffset() + extraBuf.position(), extra); extraBuf.position(extra); //append with already read bytes extraBuf.put(buffer); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java index ded3979..a4b028a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java @@ -246,8 +246,8 @@ static DiskRangeList readDiskRanges(FSDataInputStream file, } int len = (int) (range.getEnd() - range.getOffset()); long off = range.getOffset(); - file.seek(base + off); if (zcr != null) { + file.seek(base + off); boolean hasReplaced = false; while (len > 0) { ByteBuffer partial = zcr.readBuffer(len, false); @@ -264,12 +264,13 @@ static DiskRangeList readDiskRanges(FSDataInputStream file, off += read; } } else if (doForceDirect) { + file.seek(base + off); ByteBuffer directBuf = ByteBuffer.allocateDirect(len); readDirect(file, len, directBuf); range = range.replaceSelfWith(new BufferChunk(directBuf, range.getOffset())); } else { byte[] buffer = new byte[len]; - file.readFully(buffer, 0, buffer.length); + OrcUtils.readFromStream(file, (base + off), buffer, 0, buffer.length); range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), range.getOffset())); } range = range.next;