diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 9ac34b7..64abe91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -397,7 +397,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, } try { OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(conf).filesystem(fs)); + OrcFile.readerOptions(conf).filesystem(fs).maxLength(file.getLen())); } catch (IOException e) { return false; } @@ -1002,7 +1002,7 @@ OrcSplit createSplit(long offset, long length, private void populateAndCacheStripeDetails() throws IOException { Reader orcReader = OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); + OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen())); if (fileInfo != null) { stripes = fileInfo.stripeInfos; fileMetaInfo = fileInfo.fileMetaInfo; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index fa32bf6..3e7565e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; @@ -47,7 +48,9 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; @@ -661,6 +664,14 @@ public MockBlock(String... hosts) { this.hosts = hosts; } + public void setOffset(int offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = length; + } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); @@ -799,6 +810,9 @@ public void close() throws IOException { DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream(); file.length = buf.getLength(); file.content = new byte[file.length]; + MockBlock block = new MockBlock("host1"); + block.setLength(file.length); + setBlocks(block); System.arraycopy(buf.getData(), 0, file.content, 0, file.length); } @@ -811,6 +825,7 @@ public String toString() { public static class MockFileSystem extends FileSystem { final List files = new ArrayList(); Path workingDir = new Path("/"); + protected Statistics statistics; @SuppressWarnings("unused") public MockFileSystem() { @@ -820,11 +835,13 @@ public MockFileSystem() { @Override public void initialize(URI uri, Configuration conf) { setConf(conf); + statistics = getStatistics("mock", getClass()); } public MockFileSystem(Configuration conf, MockFile... files) { setConf(conf); this.files.addAll(Arrays.asList(files)); + statistics = getStatistics("mock", getClass()); } void clear() { @@ -842,6 +859,7 @@ public URI getUri() { @Override public FSDataInputStream open(Path path, int i) throws IOException { + statistics.incrementReadOps(1); for(MockFile file: files) { if (file.path.equals(path)) { return new FSDataInputStream(new MockInputStream(file)); @@ -856,6 +874,7 @@ public FSDataOutputStream create(Path path, FsPermission fsPermission, short replication, long blockSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); MockFile file = null; for(MockFile currentFile: files) { if (currentFile.path.equals(path)) { @@ -874,27 +893,70 @@ public FSDataOutputStream create(Path path, FsPermission fsPermission, public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); return create(path, FsPermission.getDefault(), true, bufferSize, (short) 3, 256 * 1024, progressable); } @Override public boolean rename(Path path, Path path2) throws IOException { + statistics.incrementWriteOps(1); return false; } @Override public boolean delete(Path path) throws IOException { + statistics.incrementWriteOps(1); return false; } @Override public boolean delete(Path path, boolean b) throws IOException { + statistics.incrementWriteOps(1); return false; } @Override + public RemoteIterator listLocatedStatus(final Path f) + throws IOException { + return new RemoteIterator() { + private Iterator iterator = listLocatedFileStatuses(f).iterator(); + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return iterator.next(); + } + }; + } + + private List listLocatedFileStatuses(Path path) throws IOException { + statistics.incrementReadOps(1); + path = path.makeQualified(this); + List result = new ArrayList<>(); + String pathname = path.toString(); + String pathnameAsDir = pathname + "/"; + Set dirs = new TreeSet(); + MockFile file = findFile(path); + if (file != null) { + result.add(createLocatedStatus(file)); + return result; + } + findMatchingLocatedFiles(files, pathnameAsDir, dirs, result); + // for each directory add it once + for (String dir : dirs) { + result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result; + } + + @Override public FileStatus[] listStatus(Path path) throws IOException { + statistics.incrementReadOps(1); path = path.makeQualified(this); List result = new ArrayList(); String pathname = path.toString(); @@ -921,6 +983,32 @@ public boolean delete(Path path, boolean b) throws IOException { return result.toArray(new FileStatus[result.size()]); } + private MockFile findFile(Path path) { + for (MockFile file: files) { + if (file.path.equals(path)) { + return file; + } + } + return null; + } + + private void findMatchingLocatedFiles( + List files, String pathnameAsDir, Set dirs, List result) + throws IOException { + for (MockFile file : files) { + String filename = file.path.toString(); + if (filename.startsWith(pathnameAsDir)) { + String tail = filename.substring(pathnameAsDir.length()); + int nextSlash = tail.indexOf('/'); + if (nextSlash > 0) { + dirs.add(tail.substring(0, nextSlash)); + } else { + result.add(createLocatedStatus(file)); + } + } + } + } + @Override public void setWorkingDirectory(Path path) { workingDir = path; @@ -933,9 +1021,22 @@ public Path getWorkingDirectory() { @Override public boolean mkdirs(Path path, FsPermission fsPermission) { + statistics.incrementWriteOps(1); return false; } + private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException { + FileStatus fileStatus = createStatus(file); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + + private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { + FileStatus fileStatus = createDirectory(dir); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + private FileStatus createStatus(MockFile file) { return new FileStatus(file.length, false, 1, file.blockSize, 0, 0, FsPermission.createImmutable((short) 644), "owen", "group", @@ -949,6 +1050,7 @@ private FileStatus createDirectory(Path dir) { @Override public FileStatus getFileStatus(Path path) throws IOException { + statistics.incrementReadOps(1); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; for(MockFile file: files) { @@ -962,24 +1064,30 @@ public FileStatus getFileStatus(Path path) throws IOException { } @Override - public BlockLocation[] getFileBlockLocations(FileStatus stat, - long start, long len) { + public BlockLocation[] getFileBlockLocations(FileStatus stat, long start, long len) throws IOException { + return getFileBlockLocationsImpl(stat, start, len, true); + } + + private BlockLocation[] getFileBlockLocationsImpl(FileStatus stat, long start, long len, boolean updateStats) + throws IOException { + if (updateStats) { + statistics.incrementReadOps(1); + } List result = new ArrayList(); - for(MockFile file: files) { - if (file.path.equals(stat.getPath())) { - for(MockBlock block: file.blocks) { - if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, - block.length, start, len) > 0) { - String[] topology = new String[block.hosts.length]; - for(int i=0; i < topology.length; ++i) { - topology[i] = "/rack/ " + block.hosts[i]; - } - result.add(new BlockLocation(block.hosts, block.hosts, - topology, block.offset, block.length)); + MockFile file = findFile(stat.getPath()); + if (file != null) { + for(MockBlock block: file.blocks) { + if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, + block.length, start, len) > 0) { + String[] topology = new String[block.hosts.length]; + for(int i=0; i < topology.length; ++i) { + topology[i] = "/rack/ " + block.hosts[i]; } + result.add(new BlockLocation(block.hosts, block.hosts, + topology, block.offset, block.length)); } - return result.toArray(new BlockLocation[result.size()]); } + return result.toArray(new BlockLocation[result.size()]); } return new BlockLocation[0]; } @@ -2124,4 +2232,58 @@ public void testSplitEliminationNullStats() throws Exception { assertEquals(0, splits.length); } + @Test + public void testSplitGenReadOps() throws Exception { + MockFileSystem fs = new MockFileSystem(conf); + conf.set("mapred.input.dir", "mock:///mocktable"); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + MockPath mockPath = new MockPath(fs, "mock:///mocktable"); + StructObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + Writer writer = + OrcFile.createWriter(new Path(mockPath + "/0_0"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for (int i = 0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2 * i)); + } + writer.close(); + + writer = OrcFile.createWriter(new Path(mockPath + "/0_1"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for (int i = 0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2 * i)); + } + writer.close(); + + int readOpsBefore = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsBefore = statistics.getReadOps(); + } + } + assertTrue("MockFS has stats. Read ops not expected to be -1", readOpsBefore != -1); + OrcInputFormat orcInputFormat = new OrcInputFormat(); + InputSplit[] splits = orcInputFormat.getSplits(conf, 2); + int readOpsDelta = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsDelta = statistics.getReadOps() - readOpsBefore; + } + } + // call-1: listLocatedStatus - mock:/mocktable + // call-2: open - mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_1 + assertEquals(3, readOpsDelta); + + assertEquals(2, splits.length); + // revert back to local fs + conf.set("fs.defaultFS", "file:///"); + } }