diff --git build.properties build.properties index 3009bba..d53941b 100644 --- build.properties +++ build.properties @@ -29,7 +29,7 @@ javac.args.warnings= hadoop-0.20.version=0.20.2 hadoop-0.20S.version=1.1.2 -hadoop-0.23.version=2.1.0-beta +hadoop-0.23.version=2.1.2-SNAPSHOT hadoop.version=${hadoop-0.20.version} hadoop.security.version=${hadoop-0.20S.version} # Used to determine which set of Hadoop artifacts we depend on. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index e05b9e4..35005ee 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -18,14 +18,15 @@ package org.apache.hadoop.hive.ql.io.orc; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import java.io.IOException; - /** * Contains factory methods to read or write ORC files. */ @@ -125,6 +126,11 @@ public static Reader createReader(FileSystem fs, Path path return new ReaderImpl(fs, path); } + public static Reader createReader(FileSystem fs, Path path, FileMetaInfo fileMetaInfo) + throws IOException { + return new ReaderImpl(fs, path, fileMetaInfo); + } + /** * Options for creating ORC file writers. */ @@ -303,4 +309,5 @@ MemoryManager getMemoryManager(Configuration conf) { } return memoryManager; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 13cd0cd..cf69f20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -237,13 +238,23 @@ public static SearchArgument createSarg(List types, Configuration reporter); return (RecordReader) vorr; } - - FileSplit fileSplit = (FileSplit) inputSplit; - Path path = fileSplit.getPath(); + FileSplit fSplit = (FileSplit)inputSplit; + reporter.setStatus(fSplit.toString()); + Path path = fSplit.getPath(); FileSystem fs = path.getFileSystem(conf); - reporter.setStatus(fileSplit.toString()); - return new OrcRecordReader(OrcFile.createReader(fs, path), conf, - fileSplit.getStart(), fileSplit.getLength()); + Reader reader = null; + + if(!(fSplit instanceof OrcSplit)){ + //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit + reader = OrcFile.createReader(fs, path); + } else { + //We have OrcSplit, which has footer metadata cached, so used the appropriate reader + //constructor + OrcSplit orcSplit = (OrcSplit) fSplit; + FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); + reader = OrcFile.createReader(fs, path, fMetaInfo); + } + return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength()); } @Override @@ -301,7 +312,7 @@ private boolean isVectorMode(Configuration conf) { */ static class Context { private final ExecutorService threadPool = Executors.newFixedThreadPool(10); - private final List splits = new ArrayList(10000); + private final List splits = new ArrayList(10000); private final List errors = new ArrayList(); private final HadoopShims shims = ShimLoader.getHadoopShims(); private final Configuration conf; @@ -330,7 +341,7 @@ int getSchedulers() { * the back. * @result the Nth file split */ - FileSplit getResult(int index) { + OrcSplit getResult(int index) { if (index >= 0) { return splits.get(index); } else { @@ -476,9 +487,10 @@ static long getOverlap(long offset1, long length1, * are written with large block sizes. * @param offset the start of the split * @param length the length of the split + * @param fileMetaInfo file metadata from footer and postscript * @throws IOException */ - void createSplit(long offset, long length) throws IOException { + void createSplit(long offset, long length, FileMetaInfo fileMetaInfo) throws IOException { String[] hosts; if ((offset % blockSize) + length <= blockSize) { // handle the single block case @@ -522,8 +534,8 @@ void createSplit(long offset, long length) throws IOException { hostList.toArray(hosts); } synchronized (context.splits) { - context.splits.add(new FileSplit(file.getPath(), offset, length, - hosts)); + context.splits.add(new OrcSplit(file.getPath(), offset, length, + hosts, fileMetaInfo)); } } @@ -543,7 +555,7 @@ public void run() { // crossed a block boundary, cut the input split here. if (currentOffset != -1 && currentLength > context.minSize && (currentOffset / blockSize != stripe.getOffset() / blockSize)) { - createSplit(currentOffset, currentLength); + createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); currentOffset = -1; } // if we aren't building a split, start a new one. @@ -554,12 +566,12 @@ public void run() { currentLength += stripe.getLength(); } if (currentLength >= context.maxSize) { - createSplit(currentOffset, currentLength); + createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); currentOffset = -1; } } if (currentOffset != -1) { - createSplit(currentOffset, currentLength); + createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); } } catch (Throwable th) { synchronized (context.errors) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java new file mode 100644 index 0000000..65eb510 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -0,0 +1,74 @@ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileSplit; + + + +/** + * OrcFileSplit. Holds file meta info + * + */ +public class OrcSplit extends FileSplit { + private Reader.FileMetaInfo fileMetaInfo; + + protected OrcSplit(){ + //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. + //This constructor is used to create the object and then call readFields() + // so just pass nulls to this super constructor. + super(null, 0, 0, (String[])null); + } + + public OrcSplit(Path path, long offset, long length, String[] hosts, + FileMetaInfo fileMetaInfo) { + super(path, offset, length, hosts); + this.fileMetaInfo = fileMetaInfo; + } + + @Override + public void write(DataOutput out) throws IOException { + //serialize path, offset, length using FileSplit + super.write(out); + + //serialize FileMetaInfo fields + Text.writeString(out, fileMetaInfo.compressionType); + WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); + + //serialize FileMetaInfo field footer + ByteBuffer footerBuff = fileMetaInfo.footerBuffer; + footerBuff.reset(); + //write length of buffer + WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); + out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position()); + } + + @Override + public void readFields(DataInput in) throws IOException { + //deserialize path, offset, length using FileSplit + super.readFields(in); + + //deserialize FileMetaInfo fields + String compressionType = Text.readString(in); + int bufferSize = WritableUtils.readVInt(in); + + //deserialize FileMetaInfo field footer + int footerBuffSize = WritableUtils.readVInt(in); + ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); + in.readFully(footerBuff.array(), 0, footerBuffSize); + + fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff); + } + + public FileMetaInfo getFileMetaInfo(){ + return fileMetaInfo; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 37d7d86..7b676f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + /** * The interface for reading ORC files. * @@ -115,6 +115,28 @@ List getTypes(); /** + * FileMetaInfo - represents file metadata stored in footer and postscript sections of the file + * that is useful for Reader implementation + * + */ + class FileMetaInfo{ + final String compressionType; + final int bufferSize; + final ByteBuffer footerBuffer; + FileMetaInfo(String compressionType, int bufferSize, ByteBuffer footerBuffer){ + this.compressionType = compressionType; + this.bufferSize = bufferSize; + this.footerBuffer = footerBuffer; + } + } + + /** + * Get the metadata stored in footer and postscript sections of the file + * @return MetaInfo object with file metadata + */ + FileMetaInfo getFileMetaInfo(); + + /** * Create a RecordReader that will scan the entire file. * @param include true for each column that should be included * @return A new RecordReader @@ -134,6 +156,7 @@ * @throws IOException * @deprecated */ + @Deprecated RecordReader rows(long offset, long length, boolean[] include) throws IOException; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index bade3cc..6aa3a60 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -58,6 +58,11 @@ private final ObjectInspector inspector; private long deserializedSize = -1; + //serialized footer - Keeping this around for use by getFileMetaInfo() + // will help avoid cpu cycles spend in deserializing at cost of increased + // memory footprint. + private final ByteBuffer footerByteBuffer; + private static final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private static final String CLASS_NAME = ReaderImpl.class.getName(); @@ -276,66 +281,166 @@ static void checkOrcVersion(Log log, Path path, List version) { } } + /** + * Constructor that extracts metadata information from file footer + * @param fs + * @param path + * @throws IOException + */ ReaderImpl(FileSystem fs, Path path) throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INIT_ORC_RECORD_READER); this.fileSystem = fs; this.path = path; + + FileMetaInfo footerMetaData = extractMetaInfoFromFooter(fs, path); + + MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, + footerMetaData.bufferSize, footerMetaData.footerBuffer); + + this.footerByteBuffer = footerMetaData.footerBuffer; + this.compressionKind = rInfo.compressionKind; + this.codec = rInfo.codec; + this.bufferSize = rInfo.bufferSize; + this.footer = rInfo.footer; + this.inspector = rInfo.inspector; + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INIT_ORC_RECORD_READER); + } + + + /** + * Constructor that takes already saved footer meta information. Used for creating RecordReader + * from saved information in InputSplit + * @param fs + * @param path + * @param fMetaInfo + * @throws IOException + */ + ReaderImpl(FileSystem fs, Path path, FileMetaInfo fMetaInfo) + throws IOException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INIT_ORC_RECORD_READER); + this.fileSystem = fs; + this.path = path; + + MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor( + fMetaInfo.compressionType, + fMetaInfo.bufferSize, + fMetaInfo.footerBuffer + ); + this.footerByteBuffer = fMetaInfo.footerBuffer; + this.compressionKind = rInfo.compressionKind; + this.codec = rInfo.codec; + this.bufferSize = rInfo.bufferSize; + this.footer = rInfo.footer; + this.inspector = rInfo.inspector; + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INIT_ORC_RECORD_READER); + + } + + + private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, Path path) throws IOException { FSDataInputStream file = fs.open(path); + + //read last bytes into buffer to get PostScript long size = fs.getFileStatus(path).getLen(); int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); file.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + + //read the PostScript + //get length of PostScript int psLen = buffer.get(readSize - 1) & 0xff; ensureOrcFooter(file, path, psLen, buffer); int psOffset = readSize - 1 - psLen; CodedInputStream in = CodedInputStream.newInstance(buffer.array(), buffer.arrayOffset() + psOffset, psLen); OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); + checkOrcVersion(LOG, path, ps.getVersionList()); - int footerSize = (int) ps.getFooterLength(); - bufferSize = (int) ps.getCompressionBlockSize(); + + //check compression codec switch (ps.getCompression()) { case NONE: - compressionKind = CompressionKind.NONE; break; case ZLIB: - compressionKind = CompressionKind.ZLIB; break; case SNAPPY: - compressionKind = CompressionKind.SNAPPY; break; case LZO: - compressionKind = CompressionKind.LZO; break; default: throw new IllegalArgumentException("Unknown compression"); } - codec = WriterImpl.createCodec(compressionKind); + + //get footer size + int footerSize = (int) ps.getFooterLength(); + + //check if extra bytes need to be read int extra = Math.max(0, psLen + 1 + footerSize - readSize); if (extra > 0) { + //more bytes need to be read, seek back to the right place and read extra bytes file.seek(size - readSize - extra); ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); file.readFully(extraBuf.array(), extraBuf.arrayOffset() + extraBuf.position(), extra); extraBuf.position(extra); + //append with already read bytes extraBuf.put(buffer); buffer = extraBuf; buffer.position(0); buffer.limit(footerSize); } else { + //footer is already in the bytes in buffer, just adjust position, length buffer.position(psOffset - footerSize); buffer.limit(psOffset); } - InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer}, - new long[]{0L}, footerSize, codec, bufferSize); - footer = OrcProto.Footer.parseFrom(instream); - inspector = OrcStruct.createObjectInspector(0, footer.getTypesList()); file.close(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INIT_ORC_RECORD_READER); + //mark this position so that we can call reset() to get back to it + buffer.mark(); + + return new FileMetaInfo( + ps.getCompression().toString(), + (int) ps.getCompressionBlockSize(), + buffer + ); + } + + + /** + * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl + * from serialized fields. + * As the fields are final, the fields need to be initialized in the constructor and + * can't be done in some helper function. So this helper class is used instead. + * + */ + private static class MetaInfoObjExtractor{ + final CompressionKind compressionKind; + final CompressionCodec codec; + final int bufferSize; + final OrcProto.Footer footer; + final ObjectInspector inspector; + + MetaInfoObjExtractor(String codecStr, int bufferSize, ByteBuffer footerBuffer) throws IOException { + this.compressionKind = CompressionKind.valueOf(codecStr); + this.bufferSize = bufferSize; + this.codec = WriterImpl.createCodec(compressionKind); + int footerBufferSize = footerBuffer.limit() - footerBuffer.position(); + InputStream instream = InStream.create("footer", new ByteBuffer[]{footerBuffer}, + new long[]{0L}, footerBufferSize, codec, bufferSize); + this.footer = OrcProto.Footer.parseFrom(instream); + this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList()); + } + } + + public FileMetaInfo getFileMetaInfo(){ + return new FileMetaInfo(compressionKind.toString(), bufferSize, footerByteBuffer); + } + + + @Override public RecordReader rows(boolean[] include) throws IOException { return rows(0, Long.MAX_VALUE, include, null, null); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 2209ee3..3ba17d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.NullWritable; @@ -140,11 +141,26 @@ public VectorizedOrcInputFormat() { public RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { - FileSplit fileSplit = (FileSplit) inputSplit; - Path path = fileSplit.getPath(); + FileSplit fSplit = (FileSplit)inputSplit; + reporter.setStatus(fSplit.toString()); + + Path path = fSplit.getPath(); FileSystem fs = path.getFileSystem(conf); - reporter.setStatus(fileSplit.toString()); - return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, fileSplit); + + Reader reader = null; + + if(!(fSplit instanceof OrcSplit)){ + //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit + reader = OrcFile.createReader(fs, path); + } else { + //We have OrcSplit, which has footer metadata cached, so used the appropriate reader + //constructor + OrcSplit orcSplit = (OrcSplit) fSplit; + FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); + reader = OrcFile.createReader(fs, path, fMetaInfo); + } + + return new VectorizedOrcRecordReader(reader, conf, fSplit); } @Override