diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 9bfe268..6d47532 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -1486,6 +1488,7 @@ public static void readOrcByteArrays(InStream stream, IntegerReader lengths, */ protected static class StringDirectTreeReader extends TreeReader { protected InStream stream; + protected TextReaderShim data; protected IntegerReader lengths; private final LongColumnVector scratchlcv; @@ -1500,6 +1503,7 @@ public static void readOrcByteArrays(InStream stream, IntegerReader lengths, this.stream = data; if (length != null && encoding != null) { this.lengths = createIntegerReader(encoding, length, false, false); + this.data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream); } } @@ -1520,6 +1524,7 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); + data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream); lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(), streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, false); @@ -1534,6 +1539,7 @@ void seek(PositionProvider[] index) throws IOException { public void seek(PositionProvider index) throws IOException { super.seek(index); stream.seek(index); + // don't seek data stream lengths.seek(index); } @@ -1548,17 +1554,7 @@ Object next(Object previous) throws IOException { result = (Text) previous; } int len = (int) lengths.next(); - int offset = 0; - byte[] bytes = new byte[len]; - while (len > 0) { - int written = stream.read(bytes, offset, len); - if (written < 0) { - throw new EOFException("Can't finish byte read from " + stream); - } - len -= written; - offset += written; - } - result.set(bytes); + data.read(result, len); } return result; } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9eae0ac..f294796 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.shims; +import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -67,7 +69,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -108,10 +112,12 @@ HadoopShims.MiniDFSShim cluster = null; final boolean zeroCopy; final boolean storagePolicy; + final boolean fastread; public Hadoop23Shims() { boolean zcr = false; boolean storage = false; + boolean fastread = false; try { Class.forName("org.apache.hadoop.fs.CacheFlag", false, ShimLoader.class.getClassLoader()); @@ -128,8 +134,18 @@ public Hadoop23Shims() { } catch (ClassNotFoundException ce) { } } + + if (storage) { + for (Method m : Text.class.getMethods()) { + if ("readWithKnownLength".equals(m.getName())) { + fastread = true; + } + } + } + this.storagePolicy = storage; this.zeroCopy = zcr; + this.fastread = fastread; } @Override @@ -1348,8 +1364,31 @@ public int readByteBuffer(FSDataInputStream file, ByteBuffer dest) throws IOExce } return result; } + + @Override public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException { // Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS. fs.addDelegationTokens(uname, cred); } + + private final class FastTextReaderShim implements TextReaderShim { + private final DataInputStream din; + + public FastTextReaderShim(InputStream in) { + this.din = new DataInputStream(in); + } + + @Override + public void read(Text txt, int len) throws IOException { + txt.readWithKnownLength(din, len); + } + } + + @Override + public TextReaderShim getTextReaderShim(InputStream in) throws IOException { + if (!fastread) { + return super.getTextReaderShim(in); + } + return new FastTextReaderShim(in); + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 74785e5..5c6a4ca 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobProfile; @@ -731,4 +733,24 @@ public void deleteKey(String keyName) throws IOException { * @throws IOException If an error occurred on adding the token. */ public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException; + + /** + * Read data into a Text object in the fastest way possible + */ + public interface TextReaderShim { + /** + * @param txt + * @param len + * @return bytes read + * @throws IOException + */ + void read(Text txt, int size) throws IOException; + } + + /** + * Wrap a TextReaderShim around an input stream. The reader shim will not + * buffer any reads from the underlying stream and will only consume bytes + * which are required for TextReaderShim.read() input. + */ + public TextReaderShim getTextReaderShim(InputStream input) throws IOException; } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java index 89d7798..c6b7c9d 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java @@ -19,7 +19,9 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Constructor; import java.net.URI; import java.security.AccessControlException; @@ -40,6 +42,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -392,4 +395,33 @@ public void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action) @Override abstract public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException; + + private final class BasicTextReaderShim implements TextReaderShim { + private final InputStream in; + + public BasicTextReaderShim(InputStream in) { + this.in = in; + } + + @Override + public void read(Text txt, int len) throws IOException { + int offset = 0; + byte[] bytes = new byte[len]; + while (len > 0) { + int written = in.read(bytes, offset, len); + if (written < 0) { + throw new EOFException("Can't finish read from " + in + " read " + + (offset) + " bytes out of " + bytes.length); + } + len -= written; + offset += written; + } + txt.set(bytes); + } + } + + @Override + public TextReaderShim getTextReaderShim(InputStream in) throws IOException { + return new BasicTextReaderShim(in); + } }