diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java index 01b07f2..ded3979 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim; import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; @@ -46,6 +47,7 @@ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl. */ public class RecordReaderUtils { + private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); static boolean[] findPresentStreamsByColumn( List streamList, List types) { boolean[] hasNull = new boolean[types.size()]; @@ -263,7 +265,7 @@ static DiskRangeList readDiskRanges(FSDataInputStream file, } } else if (doForceDirect) { ByteBuffer directBuf = ByteBuffer.allocateDirect(len); - readDirect(file, len, directBuf, true); + readDirect(file, len, directBuf); range = range.replaceSelfWith(new BufferChunk(directBuf, range.getOffset())); } else { byte[] buffer = new byte[len]; @@ -276,13 +278,13 @@ static DiskRangeList readDiskRanges(FSDataInputStream file, } public static void readDirect(FSDataInputStream file, - int len, ByteBuffer directBuf, boolean doSetLimit) throws IOException { + int len, ByteBuffer directBuf) throws IOException { // TODO: HDFS API is a mess, so handle all kinds of cases. // Before 2.7, read() also doesn't adjust position correctly, so track it separately. int pos = directBuf.position(), startPos = pos, endPos = pos + len; try { while (pos < endPos) { - int count = file.read(directBuf); + int count = SHIMS.readByteBuffer(file, directBuf); if (count < 0) throw new EOFException(); assert count != 0 : "0-length read: " + (endPos - pos) + "@" + (pos - startPos); pos += count; @@ -298,9 +300,7 @@ public static void readDirect(FSDataInputStream file, directBuf.put(buffer); } directBuf.position(startPos); - if (doSetLimit) { - directBuf.limit(startPos + len); - } + directBuf.limit(startPos + len); } diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 2520d2a..0727945 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -24,6 +24,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -697,4 +698,25 @@ public HdfsEncryptionShim createHdfsEncryptionShim(FileSystem fs, Configuration public Path getPathWithoutSchemeAndAuthority(Path path) { return path; } + + @Override + public int readByteBuffer(FSDataInputStream file, ByteBuffer dest) throws IOException { + // Inefficient for direct buffers; only here for compat. + int pos = dest.position(); + if (dest.hasArray()) { + int result = file.read(dest.array(), dest.arrayOffset(), dest.remaining()); + if (result > 0) { + dest.position(pos + result); + } + return result; + } else { + byte[] arr = new byte[dest.remaining()]; + int result = file.read(arr, 0, arr.length); + if (result > 0) { + dest.put(arr, 0, result); + dest.position(pos + result); + } + return result; + } + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 2997286..9168fba 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; +import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -1340,4 +1341,15 @@ private int compareKeyStrength(String keyname1, String keyname2) throws IOExcept public Path getPathWithoutSchemeAndAuthority(Path path) { return Path.getPathWithoutSchemeAndAuthority(path); } + + @Override + public int readByteBuffer(FSDataInputStream file, ByteBuffer dest) throws IOException { + int pos = dest.position(); + int result = file.read(dest); + if (result > 0) { + // Ensure this explicitly since versions before 2.7 read doesn't do it. + dest.position(pos + result); + } + return result; + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 9d076da..08bab90 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -708,4 +708,13 @@ public void deleteKey(String keyName) throws IOException { public HdfsEncryptionShim createHdfsEncryptionShim(FileSystem fs, Configuration conf) throws IOException; public Path getPathWithoutSchemeAndAuthority(Path path); + + /** + * Reads data into ByteBuffer. + * @param file File. + * @param dest Buffer. + * @return Number of bytes read, just like file.read. If any bytes were read, dest position + * will be set to old position + number of bytes read. + */ + int readByteBuffer(FSDataInputStream file, ByteBuffer dest) throws IOException; }