diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 913d3ac..8f8ec43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -610,7 +612,7 @@ private FileInfo verifyCachedFileInfo(FileStatus file) { private final FileSystem fs; private final FileStatus file; private final long blockSize; - private final BlockLocation[] locations; + private final TreeMap locations; private final FileInfo fileInfo; private List stripes; private ReaderImpl.FileMetaInfo fileMetaInfo; @@ -630,7 +632,7 @@ private FileInfo verifyCachedFileInfo(FileStatus file) { this.file = file; this.blockSize = file.getBlockSize(); this.fileInfo = fileInfo; - locations = SHIMS.getLocations(fs, file); + locations = SHIMS.getLocationsWithOffSet(fs, file); this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; @@ -641,8 +643,8 @@ Path getPath() { } void schedule() throws IOException { - if(locations.length == 1 && file.getLen() < context.maxSize) { - String[] hosts = locations[0].getHosts(); + if(locations.size() == 1 && file.getLen() < context.maxSize) { + String[] hosts = ((BlockLocation)locations.values().toArray()[0]).getHosts(); synchronized (context.splits) { context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(), hosts, fileMetaInfo, isOriginal, hasBase, deltas)); @@ -690,15 +692,22 @@ static long getOverlap(long offset1, long length1, void createSplit(long offset, long length, ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException { String[] hosts; - if ((offset % blockSize) + length <= blockSize) { + Map.Entry startEntry = locations.floorEntry(offset); + BlockLocation start = startEntry.getValue(); + if (offset + length <= start.getOffset() + start.getLength()) { // handle the single block case - hosts = locations[(int) (offset / blockSize)].getHosts(); + hosts = start.getHosts(); } else { + Map.Entry endEntry = locations.floorEntry(offset + length); + BlockLocation end = endEntry.getValue(); + //get the submap + NavigableMap navigableMap = locations.subMap(startEntry.getKey(), + true, endEntry.getKey(), true); // Calculate the number of bytes in the split that are local to each // host. Map sizes = new HashMap(); long maxSize = 0; - for(BlockLocation block: locations) { + for (BlockLocation block : navigableMap.values()) { long overlap = getOverlap(offset, length, block.getOffset(), block.getLength()); if (overlap > 0) { @@ -711,6 +720,9 @@ void createSplit(long offset, long length, val.set(val.get() + overlap); maxSize = Math.max(maxSize, val.get()); } + } else { + throw new RuntimeException("File " + file.getPath().toString() + + " should have had overlap on block starting at " + block.getOffset()); } } // filter the list of locations to those that have at least 80% of the @@ -718,7 +730,7 @@ void createSplit(long offset, long length, long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION); List hostList = new ArrayList(); // build the locations in a predictable order to simplify testing - for(BlockLocation block: locations) { + for(BlockLocation block: navigableMap.values()) { for(String host: block.getHosts()) { if (sizes.containsKey(host)) { if (sizes.get(host).get() >= threshold) { @@ -733,7 +745,7 @@ void createSplit(long offset, long length, } synchronized (context.splits) { context.splits.add(new OrcSplit(file.getPath(), offset, length, - hosts, fileMetaInfo, isOriginal, hasBase, deltas)); + hosts, fileMetaInfo, isOriginal, hasBase, deltas)); } } diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index faae060..708a3ed 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import javax.security.auth.Subject; import javax.security.auth.login.LoginException; @@ -652,6 +653,17 @@ public UserGroupInformation createProxyUser(String userName) throws IOException } @Override + public TreeMap getLocationsWithOffSet(FileSystem fs, + FileStatus status) throws IOException { + TreeMap offsetBlockMap = new TreeMap(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.sync(); } diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index a809eb1..b4bf9f1 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -403,6 +404,17 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi } @Override + public TreeMap getLocationsWithOffSet(FileSystem fs, + FileStatus status) throws IOException { + TreeMap offsetBlockMap = new TreeMap(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.sync(); } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index f8d9346..00afce8 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -511,6 +512,17 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi } @Override + public TreeMap getLocationsWithOffSet(FileSystem fs, + FileStatus status) throws IOException { + TreeMap offsetBlockMap = new TreeMap(); + BlockLocation[] locations = getLocations(fs, status); + for (BlockLocation location : locations) { + offsetBlockMap.put(location.getOffset(), location); + } + return offsetBlockMap; + } + + @Override public void hflush(FSDataOutputStream stream) throws IOException { stream.hflush(); } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 964c38d..13857e3 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import javax.security.auth.login.LoginException; @@ -477,6 +478,19 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte FileStatus status) throws IOException; /** + * For the block locations returned by getLocations() convert them into a Treemap + * by iterating over the list of blockLocation. + * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular + * block based upon offset. + * @param fs the file system + * @param status the file information + * @return TreeMap + * @throws IOException + */ + TreeMap getLocationsWithOffSet(FileSystem fs, + FileStatus status) throws IOException; + + /** * Flush and make visible to other users the changes to the given stream. * @param stream the stream to hflush. * @throws IOException