diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java index 2da590e..1dd5e43 100644 --- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java @@ -345,6 +345,7 @@ public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { options.getMaxLength()); this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; } + options.fileMetaInfo(footerMetaData); MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, footerMetaData.bufferSize, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 087207b..185852c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -467,7 +467,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, } try { OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(conf).filesystem(fs)); + OrcFile.readerOptions(conf).filesystem(fs).maxLength(file.getLen())); } catch (IOException e) { return false; } @@ -1391,7 +1391,7 @@ private void populateAndCacheStripeDetails() throws IOException { private Reader createOrcReader() throws IOException { return OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); + OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen())); } private long computeProjectionSize(List types, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 3a2e7d8..0b40fef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -56,10 +56,10 @@ //serialized footer - Keeping this around for use by getFileMetaInfo() // will help avoid cpu cycles spend in deserializing at cost of increased // memory footprint. - private final ByteBuffer footerByteBuffer; + private ByteBuffer footerByteBuffer; // Same for metastore cache - maintains the same background buffer, but includes postscript. // This will only be set if the file footer/metadata was read from disk. - private final ByteBuffer footerMetaAndPsBuffer; + private ByteBuffer footerMetaAndPsBuffer; @Override public ObjectInspector getObjectInspector() { @@ -89,18 +89,15 @@ public ReaderImpl(Path path, FileMetadata fileMetadata = options.getFileMetadata(); if (fileMetadata != null) { this.inspector = OrcStruct.createObjectInspector(0, fileMetadata.getTypes()); - this.footerByteBuffer = null; // not cached and not needed here - this.footerMetaAndPsBuffer = null; } else { FileMetaInfo footerMetaData; if (options.getFileMetaInfo() != null) { footerMetaData = options.getFileMetaInfo(); - this.footerMetaAndPsBuffer = null; } else { footerMetaData = extractMetaInfoFromFooter(fileSystem, path, options.getMaxLength()); - this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; } + this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, footerMetaData.bufferSize, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 52098ae..edaecb3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; @@ -47,7 +48,9 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; @@ -600,6 +603,61 @@ public void testACIDSplitStrategy() throws Exception { } @Test + public void testSplitGenReadOps() throws Exception { + MockFileSystem fs = new MockFileSystem(conf); + conf.set("mapred.input.dir", "mock:///mocktable"); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + MockPath mockPath = new MockPath(fs, "mock:///mocktable"); + StructObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + Writer writer = + OrcFile.createWriter(new Path(mockPath + "/0_0"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for(int i=0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2*i)); + } + writer.close(); + + writer = OrcFile.createWriter(new Path(mockPath + "/0_1"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for(int i=0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2*i)); + } + writer.close(); + + int readOpsBefore = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsBefore = statistics.getReadOps(); + } + } + assertTrue("MockFS has stats. Read ops not expected to be -1", readOpsBefore != -1); + OrcInputFormat orcInputFormat = new OrcInputFormat(); + InputSplit[] splits = orcInputFormat.getSplits(conf, 2); + int readOpsDelta = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsDelta = statistics.getReadOps() - readOpsBefore; + } + } + // call-1: listLocatedStatus - mock:/mocktable + // call-2: open - mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_0 + assertEquals(3, readOpsDelta); + + assertEquals(2, splits.length); + // revert back to local fs + conf.set("fs.defaultFS", "file:///"); + } + + @Test public void testBIStrategySplitBlockBoundary() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); @@ -788,6 +846,14 @@ public MockBlock(String... hosts) { this.hosts = hosts; } + public void setOffset(int offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = length; + } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); @@ -926,6 +992,9 @@ public void close() throws IOException { DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream(); file.length = buf.getLength(); file.content = new byte[file.length]; + MockBlock block = new MockBlock("host1"); + block.setLength(file.length); + setBlocks(block); System.arraycopy(buf.getData(), 0, file.content, 0, file.length); } @@ -941,6 +1010,7 @@ public String toString() { // statics for when the mock fs is created via FileSystem.get private static String blockedUgi = null; private final static List globalFiles = new ArrayList(); + protected Statistics statistics; public MockFileSystem() { // empty @@ -949,11 +1019,13 @@ public MockFileSystem() { @Override public void initialize(URI uri, Configuration conf) { setConf(conf); + statistics = getStatistics("mock", getClass()); } public MockFileSystem(Configuration conf, MockFile... files) { setConf(conf); this.files.addAll(Arrays.asList(files)); + statistics = getStatistics("mock", getClass()); } public static void setBlockedUgi(String s) { @@ -979,6 +1051,7 @@ public URI getUri() { @Override public FSDataInputStream open(Path path, int i) throws IOException { + statistics.incrementReadOps(1); checkAccess(); MockFile file = findFile(path); if (file != null) return new FSDataInputStream(new MockInputStream(file)); @@ -1011,6 +1084,7 @@ public FSDataOutputStream create(Path path, FsPermission fsPermission, short replication, long blockSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); MockFile file = findFile(path); if (file == null) { @@ -1024,6 +1098,7 @@ public FSDataOutputStream create(Path path, FsPermission fsPermission, public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return create(path, FsPermission.getDefault(), true, bufferSize, (short) 3, 256 * 1024, progressable); @@ -1031,24 +1106,68 @@ public FSDataOutputStream append(Path path, int bufferSize, @Override public boolean rename(Path path, Path path2) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override public boolean delete(Path path) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override public boolean delete(Path path, boolean b) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override + public RemoteIterator listLocatedStatus(final Path f) + throws IOException { + return new RemoteIterator() { + private Iterator iterator = listLocatedFileStatuses(f).iterator(); + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return iterator.next(); + } + }; + } + + private List listLocatedFileStatuses(Path path) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + path = path.makeQualified(this); + List result = new ArrayList<>(); + String pathname = path.toString(); + String pathnameAsDir = pathname + "/"; + Set dirs = new TreeSet(); + MockFile file = findFile(path); + if (file != null) { + result.add(createLocatedStatus(file)); + return result; + } + findMatchingLocatedFiles(files, pathnameAsDir, dirs, result); + findMatchingLocatedFiles(globalFiles, pathnameAsDir, dirs, result); + // for each directory add it once + for(String dir: dirs) { + result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result; + } + + @Override public FileStatus[] listStatus(Path path) throws IOException { + statistics.incrementReadOps(1); checkAccess(); path = path.makeQualified(this); List result = new ArrayList(); @@ -1084,6 +1203,23 @@ private void findMatchingFiles( } } + private void findMatchingLocatedFiles( + List files, String pathnameAsDir, Set dirs, List result) + throws IOException { + for (MockFile file: files) { + String filename = file.path.toString(); + if (filename.startsWith(pathnameAsDir)) { + String tail = filename.substring(pathnameAsDir.length()); + int nextSlash = tail.indexOf('/'); + if (nextSlash > 0) { + dirs.add(tail.substring(0, nextSlash)); + } else { + result.add(createLocatedStatus(file)); + } + } + } + } + @Override public void setWorkingDirectory(Path path) { workingDir = path; @@ -1096,6 +1232,7 @@ public Path getWorkingDirectory() { @Override public boolean mkdirs(Path path, FsPermission fsPermission) { + statistics.incrementWriteOps(1); return false; } @@ -1110,8 +1247,21 @@ private FileStatus createDirectory(Path dir) { FsPermission.createImmutable((short) 755), "owen", "group", dir); } + private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException { + FileStatus fileStatus = createStatus(file); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + + private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { + FileStatus fileStatus = createDirectory(dir); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + statistics.incrementReadOps(1); checkAccess(); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; @@ -1133,6 +1283,15 @@ public FileStatus getFileStatus(Path path) throws IOException { @Override public BlockLocation[] getFileBlockLocations(FileStatus stat, long start, long len) throws IOException { + return getFileBlockLocationsImpl(stat, start, len, true); + } + + private BlockLocation[] getFileBlockLocationsImpl(final FileStatus stat, final long start, + final long len, + final boolean updateStats) throws IOException { + if (updateStats) { + statistics.incrementReadOps(1); + } checkAccess(); List result = new ArrayList(); MockFile file = findFile(stat.getPath());