diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1631e2dea63e7f844dd9c4c62ed089268496f824..37f55520bdd9508a1c018f2b18cc54af0b3957b2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -948,6 +948,12 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Define the compression strategy to use while writing data. \n" + "This changes the compression level of higher level compression codec (like ZLIB)."), + HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet("HYBRID", "BI", "ETL"), + "This is not a user level config. BI strategy is used when there are many small ORC files in which" + + " case the entire file becomes an orc split. ETL strategy is used when there are large files" + + " whose average size is greater than max split size in which case orc split will contain the" + + " file footer within itself. HYBRID is a combination of both strategies."), + HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false, "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" + "data is read remotely (from the client or HS2 machine) and sent to all the tasks."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index c38867db20e5a92a2c85e46044a2418435f442c4..1b8f8282424463b1ca01c57a7013ce08112f0204 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -21,14 +21,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -38,15 +40,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -60,7 +59,6 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -72,13 +70,13 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InvalidInputException; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * A MapReduce/Hive input format for ORC files. @@ -107,7 +105,14 @@ InputFormatChecker, VectorizedInputFormatInterface, AcidInputFormat, CombineHiveInputFormat.AvoidSplitCombination { + static enum SplitStrategyKind{ + HYBRID, + BI, + ETL + } + private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); + private static boolean isDebugEnabled = LOG.isDebugEnabled(); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); static final String MIN_SPLIT_SIZE = SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE"); @@ -265,7 +270,6 @@ private static void includeColumnRecursive(List types, } /** * Take the configuration and figure out which columns we need to include. - * @param options the options to update * @param types the types for the file * @param conf the configuration * @param isOriginal is the file in the original format? @@ -366,30 +370,30 @@ public boolean validateInput(FileSystem fs, HiveConf conf, static class Context { private final Configuration conf; private static Cache footerCache; - private final ExecutorService threadPool; - private final List splits = - new ArrayList(10000); + private static ExecutorService threadPool = null; private final int numBuckets; - private final List errors = new ArrayList(); private final long maxSize; private final long minSize; private final boolean footerInSplits; private final boolean cacheStripeDetails; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); private final AtomicInteger numFilesCounter = new AtomicInteger(0); - private Throwable fatalError = null; private ValidTxnList transactionList; - - /** - * A count of the number of threads that may create more work for the - * thread pool. - */ - private int schedulers = 0; + private List deltas; + private boolean[] covered; + private SplitStrategyKind splitStrategyKind; Context(Configuration conf) { this.conf = conf; minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); + String ss = conf.get(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname); + if (ss == null) { + splitStrategyKind = SplitStrategyKind.HYBRID; + } else { + LOG.info("Requested " + ss + " split strategy"); + splitStrategyKind = SplitStrategyKind.valueOf(ss); + } footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS); numBuckets = @@ -402,11 +406,13 @@ public boolean validateInput(FileSystem fs, HiveConf conf, cacheStripeDetails = (cacheStripeDetailsSize > 0); - threadPool = Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ORC_GET_SPLITS #%d").build()); - synchronized (Context.class) { + if (threadPool == null) { + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ORC_GET_SPLITS #%d").build()); + } + if (footerCache == null && cacheStripeDetails) { footerCache = CacheBuilder.newBuilder().concurrencyLevel(numThreads) .initialCapacity(cacheStripeDetailsSize).softValues().build(); @@ -415,88 +421,191 @@ public boolean validateInput(FileSystem fs, HiveConf conf, String value = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); transactionList = new ValidReadTxnList(value); + this.deltas = null; + this.covered = new boolean[numBuckets]; } + } - int getSchedulers() { - return schedulers; - } + interface SplitStrategy { + List getSplits() throws IOException; - /** - * Get the Nth split. - * @param index if index >= 0, count from the front, otherwise count from - * the back. - * @return the Nth file split - */ - OrcSplit getResult(int index) { - if (index >= 0) { - return splits.get(index); - } else { - return splits.get(splits.size() + index); - } + Path getDirectory(); + } + + static final class SplitInfo { + private final Context context; + private final FileSystem fs; + private final FileStatus file; + private final FileInfo fileInfo; + private final boolean isOriginal; + private final List deltas; + private final boolean hasBase; + + SplitInfo(Context context, FileSystem fs, + FileStatus file, FileInfo fileInfo, + boolean isOriginal, + List deltas, + boolean hasBase) throws IOException { + this.context = context; + this.fs = fs; + this.file = file; + this.fileInfo = fileInfo; + this.isOriginal = isOriginal; + this.deltas = deltas; + this.hasBase = hasBase; } + } - List getErrors() { - return errors; + /** + * When there are large files (files that spans multiple hdfs blocks), the OrcSplit will contain + * file footer. ETL split strategy. + */ + static final class ETLSplitStrategy implements SplitStrategy { + Context context; + FileSystem fs; + List files; + boolean isOriginal; + List deltas; + Path dir; + + public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List children, + boolean isOriginal, List deltas) { + this.context = context; + this.fs = fs; + this.files = children; + this.isOriginal = isOriginal; + this.deltas = deltas; + this.dir = dir; } - /** - * Add a unit of work. - * @param runnable the object to run - */ - synchronized void schedule(Runnable runnable) { - if (fatalError == null) { - if (runnable instanceof FileGenerator || - runnable instanceof SplitGenerator) { - schedulers += 1; + private FileInfo verifyCachedFileInfo(FileStatus file) { + context.numFilesCounter.incrementAndGet(); + FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath()); + if (fileInfo != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Info cached for path: " + file.getPath()); + } + if (fileInfo.modificationTime == file.getModificationTime() && + fileInfo.size == file.getLen()) { + // Cached copy is valid + context.cacheHitCounter.incrementAndGet(); + return fileInfo; + } else { + // Invalidate + Context.footerCache.invalidate(file.getPath()); + if (LOG.isDebugEnabled()) { + LOG.debug("Meta-Info for : " + file.getPath() + + " changed. CachedModificationTime: " + + fileInfo.modificationTime + ", CurrentModificationTime: " + + file.getModificationTime() + + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + + file.getLen()); + } } - threadPool.execute(runnable); } else { - throw new RuntimeException("serious problem", fatalError); + if (LOG.isDebugEnabled()) { + LOG.debug("Info not cached for path: " + file.getPath()); + } } + return null; } - /** - * Mark a worker that may generate more work as done. - */ - synchronized void decrementSchedulers() { - schedulers -= 1; - if (schedulers == 0) { - notify(); + @Override + public List getSplits() throws IOException { + List result = Lists.newArrayList(); + for (FileStatus file : files) { + FileInfo info = null; + if (context.cacheStripeDetails) { + info = verifyCachedFileInfo(file); + } + // ignore files of 0 length + if (file.getLen() > 0) { + result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true)); + } } + return result; } - synchronized void notifyOnNonIOException(Throwable th) { - fatalError = th; - notify(); + @Override + public Path getDirectory() { + return dir; } - /** - * Wait until all of the tasks are done. It waits until all of the - * threads that may create more work are done and then shuts down the - * thread pool and waits for the final threads to finish. - */ - synchronized void waitForTasks() { - try { - while (schedulers != 0) { - wait(); - if (fatalError != null) { - threadPool.shutdownNow(); - throw new RuntimeException("serious problem", fatalError); - } - } - threadPool.shutdown(); - threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - } catch (InterruptedException ie) { - throw new IllegalStateException("interrupted", ie); + @Override + public String toString() { + return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir; + } + } + + /** + * When there are many small files, the entire file will become an OrcSplit. BI split strategy. + */ + static final class BISplitStrategy implements SplitStrategy { + List fileStatuses; + boolean isOriginal; + List deltas; + FileSystem fs; + Context context; + Path dir; + + public BISplitStrategy(Context context, FileSystem fs, + Path dir, List fileStatuses, boolean isOriginal, + List deltas) { + this.context = context; + this.fileStatuses = fileStatuses; + this.isOriginal = isOriginal; + this.deltas = deltas; + this.fs = fs; + this.dir = dir; + } + + @Override + public List getSplits() throws IOException { + List splits = Lists.newArrayList(); + for (FileStatus fileStatus : fileStatuses) { + String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue() + .getHosts(); + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts, + null, isOriginal, true, deltas, -1); + splits.add(orcSplit); } + return splits; + } + + @Override + public Path getDirectory() { + return dir; + } + + @Override + public String toString() { + return BISplitStrategy.class.getSimpleName() + " strategy for " + dir; + } + } + + static final class ACIDSplitStrategy implements SplitStrategy { + Path dir; + + public ACIDSplitStrategy(Path dir) { + this.dir = dir; + } + + @Override + public List getSplits() throws IOException { + return null; + } + + @Override + public Path getDirectory() { + return dir; } } /** * Given a directory, get the list of files and blocks in those files. - * A thread is used for each directory. + * To parallelize file generator use "mapreduce.input.fileinputformat.list-status.num-threads" */ - static final class FileGenerator implements Runnable { + static final class FileGenerator implements Callable { private final Context context; private final FileSystem fs; private final Path dir; @@ -507,116 +616,66 @@ synchronized void waitForTasks() { this.dir = dir; } - private void scheduleSplits(FileStatus file, - boolean isOriginal, - boolean hasBase, - List deltas) throws IOException{ - FileInfo info = null; - if (context.cacheStripeDetails) { - info = verifyCachedFileInfo(file); - } - new SplitGenerator(context, fs, file, info, isOriginal, deltas, - hasBase).schedule(); - } - - /** - * For each path, get the list of files and blocks that they consist of. - */ @Override - public void run() { - try { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, - context.conf, context.transactionList); - List deltas = - AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); - Path base = dirInfo.getBaseDirectory(); - List original = dirInfo.getOriginalFiles(); - - boolean[] covered = new boolean[context.numBuckets]; - boolean isOriginal = base == null; - - // if we have a base to work from - if (base != null || !original.isEmpty()) { - - // find the base files (original or new style) - List children = original; - if (base != null) { - children = SHIMS.listLocatedStatus(fs, base, - AcidUtils.hiddenFileFilter); - } - - // for each child, schedule splits and mark off the bucket - for(FileStatus child: children) { - AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename - (child.getPath(), context.conf); - scheduleSplits(child, isOriginal, true, deltas); - int b = opts.getBucket(); - // If the bucket is in the valid range, mark it as covered. - // I wish Hive actually enforced bucketing all of the time. - if (b >= 0 && b < covered.length) { - covered[b] = true; - } - } + public SplitStrategy call() throws IOException { + SplitStrategy splitStrategy = null; + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, + context.conf, context.transactionList); + context.deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); + Path base = dirInfo.getBaseDirectory(); + List original = dirInfo.getOriginalFiles(); + + boolean isOriginal = base == null; + + // if we have a base to work from + if (base != null || !original.isEmpty()) { + + // find the base files (original or new style) + List children = original; + if (base != null) { + children = SHIMS.listLocatedStatus(fs, base, + AcidUtils.hiddenFileFilter); } - // Generate a split for any buckets that weren't covered. - // This happens in the case where a bucket just has deltas and no - // base. - if (!deltas.isEmpty()) { - for (int b = 0; b < context.numBuckets; ++b) { - if (!covered[b]) { - synchronized (context.splits) { - context.splits.add(new OrcSplit(dir, b, 0, new String[0], null, - false, false, deltas)); - } - } + long totalFileSize = 0; + for (FileStatus child : children) { + totalFileSize += child.getLen(); + AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename + (child.getPath(), context.conf); + int b = opts.getBucket(); + // If the bucket is in the valid range, mark it as covered. + // I wish Hive actually enforced bucketing all of the time. + if (b >= 0 && b < context.covered.length) { + context.covered[b] = true; } } - } catch (Throwable th) { - if (!(th instanceof IOException)) { - LOG.error("Unexpected Exception", th); - } - synchronized (context.errors) { - context.errors.add(th); - } - if (!(th instanceof IOException)) { - context.notifyOnNonIOException(th); - } - } finally { - context.decrementSchedulers(); - } - } - private FileInfo verifyCachedFileInfo(FileStatus file) { - context.numFilesCounter.incrementAndGet(); - FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath()); - if (fileInfo != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Info cached for path: " + file.getPath()); - } - if (fileInfo.modificationTime == file.getModificationTime() && - fileInfo.size == file.getLen()) { - // Cached copy is valid - context.cacheHitCounter.incrementAndGet(); - return fileInfo; - } else { - // Invalidate - Context.footerCache.invalidate(file.getPath()); - if (LOG.isDebugEnabled()) { - LOG.debug("Meta-Info for : " + file.getPath() + - " changed. CachedModificationTime: " - + fileInfo.modificationTime + ", CurrentModificationTime: " - + file.getModificationTime() - + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + - file.getLen()); - } + int numFiles = children.size(); + long avgFileSize = totalFileSize / numFiles; + switch (context.splitStrategyKind) { + case BI: + splitStrategy = new BISplitStrategy(context, fs, dir, children, + isOriginal, context.deltas); + break; + case ETL: + splitStrategy = new ETLSplitStrategy(context, fs, dir, children, + isOriginal, context.deltas); + break; + default: + if (avgFileSize > context.maxSize) { + splitStrategy = new ETLSplitStrategy(context, fs, dir, children, + isOriginal, context.deltas); + } else { + splitStrategy = new BISplitStrategy(context, fs, dir, children, + isOriginal, context.deltas); + } + break; } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Info not cached for path: " + file.getPath()); - } + splitStrategy = new ACIDSplitStrategy(dir); } - return null; + + return splitStrategy; } } @@ -624,7 +683,7 @@ private FileInfo verifyCachedFileInfo(FileStatus file) { * Split the stripes of a given file into input splits. * A thread is used for each file. */ - static final class SplitGenerator implements Runnable { + static final class SplitGenerator implements Callable> { private final Context context; private final FileSystem fs; private final FileStatus file; @@ -639,40 +698,25 @@ private FileInfo verifyCachedFileInfo(FileStatus file) { private final List deltas; private final boolean hasBase; private OrcFile.WriterVersion writerVersion; + private long projColsUncompressedSize; - SplitGenerator(Context context, FileSystem fs, - FileStatus file, FileInfo fileInfo, - boolean isOriginal, - List deltas, - boolean hasBase) throws IOException { - this.context = context; - this.fs = fs; - this.file = file; + public SplitGenerator(SplitInfo splitInfo) throws IOException { + this.context = splitInfo.context; + this.fs = splitInfo.fs; + this.file = splitInfo.file; this.blockSize = file.getBlockSize(); - this.fileInfo = fileInfo; + this.fileInfo = splitInfo.fileInfo; locations = SHIMS.getLocationsWithOffset(fs, file); - this.isOriginal = isOriginal; - this.deltas = deltas; - this.hasBase = hasBase; + this.isOriginal = splitInfo.isOriginal; + this.deltas = splitInfo.deltas; + this.hasBase = splitInfo.hasBase; + this.projColsUncompressedSize = -1; } Path getPath() { return file.getPath(); } - void schedule() throws IOException { - if(locations.size() == 1 && file.getLen() < context.maxSize) { - String[] hosts = locations.firstEntry().getValue().getHosts(); - synchronized (context.splits) { - context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(), - hosts, fileMetaInfo, isOriginal, hasBase, deltas)); - } - } else { - // if it requires a compute task - context.schedule(this); - } - } - @Override public String toString() { return "splitter(" + file.getPath() + ")"; @@ -707,7 +751,7 @@ static long getOverlap(long offset1, long length1, * @param fileMetaInfo file metadata from footer and postscript * @throws IOException */ - void createSplit(long offset, long length, + OrcSplit createSplit(long offset, long length, ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException { String[] hosts; Map.Entry startEntry = locations.floorEntry(offset); @@ -761,10 +805,8 @@ void createSplit(long offset, long length, hosts = new String[hostList.size()]; hostList.toArray(hosts); } - synchronized (context.splits) { - context.splits.add(new OrcSplit(file.getPath(), offset, length, - hosts, fileMetaInfo, isOriginal, hasBase, deltas)); - } + return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo, + isOriginal, hasBase, deltas, projColsUncompressedSize); } /** @@ -772,146 +814,122 @@ void createSplit(long offset, long length, * block size and the configured minimum and maximum sizes. */ @Override - public void run() { - try { - populateAndCacheStripeDetails(); - - // figure out which stripes we need to read - boolean[] includeStripe = null; - // we can't eliminate stripes if there are deltas because the - // deltas may change the rows making them match the predicate. - if (deltas.isEmpty()) { - Reader.Options options = new Reader.Options(); - options.include(genIncludedColumns(types, context.conf, isOriginal)); - setSearchArgument(options, types, context.conf, isOriginal); - // only do split pruning if HIVE-8732 has been fixed in the writer - if (options.getSearchArgument() != null && - writerVersion != OrcFile.WriterVersion.ORIGINAL) { - SearchArgument sarg = options.getSearchArgument(); - List sargLeaves = sarg.getLeaves(); - List stripeStats = metadata.getStripeStatistics(); - int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves, - options.getColumnNames(), getRootColumn(isOriginal)); - - if (stripeStats != null) { - // eliminate stripes that doesn't satisfy the predicate condition - includeStripe = new boolean[stripes.size()]; - for(int i=0; i < stripes.size(); ++i) { - includeStripe[i] = (i >= stripeStats.size()) || - isStripeSatisfyPredicate(stripeStats.get(i), sarg, - filterColumns); - if (LOG.isDebugEnabled() && !includeStripe[i]) { - LOG.debug("Eliminating ORC stripe-" + i + " of file '" + - file.getPath() + "' as it did not satisfy " + - "predicate condition."); - } + public List call() throws IOException { + List splits = Lists.newArrayList(); + populateAndCacheStripeDetails(); + + // figure out which stripes we need to read + boolean[] includeStripe = null; + // we can't eliminate stripes if there are deltas because the + // deltas may change the rows making them match the predicate. + if (deltas.isEmpty()) { + Reader.Options options = new Reader.Options(); + options.include(genIncludedColumns(types, context.conf, isOriginal)); + setSearchArgument(options, types, context.conf, isOriginal); + // only do split pruning if HIVE-8732 has been fixed in the writer + if (options.getSearchArgument() != null && + writerVersion != OrcFile.WriterVersion.ORIGINAL) { + SearchArgument sarg = options.getSearchArgument(); + List sargLeaves = sarg.getLeaves(); + List stripeStats = metadata.getStripeStatistics(); + int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves, + options.getColumnNames(), getRootColumn(isOriginal)); + + if (stripeStats != null) { + // eliminate stripes that doesn't satisfy the predicate condition + includeStripe = new boolean[stripes.size()]; + for (int i = 0; i < stripes.size(); ++i) { + includeStripe[i] = (i >= stripeStats.size()) || + isStripeSatisfyPredicate(stripeStats.get(i), sarg, + filterColumns); + if (LOG.isDebugEnabled() && !includeStripe[i]) { + LOG.debug("Eliminating ORC stripe-" + i + " of file '" + + file.getPath() + "' as it did not satisfy " + + "predicate condition."); } } } } + } - // if we didn't have predicate pushdown, read everything - if (includeStripe == null) { - includeStripe = new boolean[stripes.size()]; - Arrays.fill(includeStripe, true); - } + // if we didn't have predicate pushdown, read everything + if (includeStripe == null) { + includeStripe = new boolean[stripes.size()]; + Arrays.fill(includeStripe, true); + } - long currentOffset = -1; - long currentLength = 0; - int idx = -1; - for(StripeInformation stripe: stripes) { - idx++; - - if (!includeStripe[idx]) { - // create split for the previous unfinished stripe - if (currentOffset != -1) { - createSplit(currentOffset, currentLength, fileMetaInfo); - currentOffset = -1; - } - continue; - } + long currentOffset = -1; + long currentLength = 0; + int idx = -1; + for (StripeInformation stripe : stripes) { + idx++; - // if we are working on a stripe, over the min stripe size, and - // crossed a block boundary, cut the input split here. - if (currentOffset != -1 && currentLength > context.minSize && - (currentOffset / blockSize != stripe.getOffset() / blockSize)) { - createSplit(currentOffset, currentLength, fileMetaInfo); - currentOffset = -1; - } - // if we aren't building a split, start a new one. - if (currentOffset == -1) { - currentOffset = stripe.getOffset(); - currentLength = stripe.getLength(); - } else { - currentLength = - (stripe.getOffset() + stripe.getLength()) - currentOffset; - } - if (currentLength >= context.maxSize) { - createSplit(currentOffset, currentLength, fileMetaInfo); + if (!includeStripe[idx]) { + // create split for the previous unfinished stripe + if (currentOffset != -1) { + splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); currentOffset = -1; } + continue; } - if (currentOffset != -1) { - createSplit(currentOffset, currentLength, fileMetaInfo); - } - } catch (Throwable th) { - if (!(th instanceof IOException)) { - LOG.error("Unexpected Exception", th); + + // if we are working on a stripe, over the min stripe size, and + // crossed a block boundary, cut the input split here. + if (currentOffset != -1 && currentLength > context.minSize && + (currentOffset / blockSize != stripe.getOffset() / blockSize)) { + splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); + currentOffset = -1; } - synchronized (context.errors) { - context.errors.add(th); + // if we aren't building a split, start a new one. + if (currentOffset == -1) { + currentOffset = stripe.getOffset(); + currentLength = stripe.getLength(); + } else { + currentLength = + (stripe.getOffset() + stripe.getLength()) - currentOffset; } - if (!(th instanceof IOException)) { - context.notifyOnNonIOException(th); + if (currentLength >= context.maxSize) { + splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); + currentOffset = -1; } - } finally { - context.decrementSchedulers(); } + if (currentOffset != -1) { + splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); + } + + return splits; } - private void populateAndCacheStripeDetails() { - try { - Reader orcReader; - if (fileInfo != null) { - stripes = fileInfo.stripeInfos; - fileMetaInfo = fileInfo.fileMetaInfo; - metadata = fileInfo.metadata; - types = fileInfo.types; - writerVersion = fileInfo.writerVersion; - // For multiple runs, in case sendSplitsInFooter changes - if (fileMetaInfo == null && context.footerInSplits) { - orcReader = OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); - fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo(); - fileInfo.metadata = orcReader.getMetadata(); - fileInfo.types = orcReader.getTypes(); - fileInfo.writerVersion = orcReader.getWriterVersion(); - } - } else { - orcReader = OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); - stripes = orcReader.getStripes(); - metadata = orcReader.getMetadata(); - types = orcReader.getTypes(); - writerVersion = orcReader.getWriterVersion(); - fileMetaInfo = context.footerInSplits ? - ((ReaderImpl) orcReader).getFileMetaInfo() : null; - if (context.cacheStripeDetails) { - // Populate into cache. - Context.footerCache.put(file.getPath(), - new FileInfo(file.getModificationTime(), file.getLen(), stripes, - metadata, types, fileMetaInfo, writerVersion)); - } - } - } catch (Throwable th) { - if (!(th instanceof IOException)) { - LOG.error("Unexpected Exception", th); - } - synchronized (context.errors) { - context.errors.add(th); + private void populateAndCacheStripeDetails() throws IOException { + Reader orcReader = OrcFile.createReader(file.getPath(), + OrcFile.readerOptions(context.conf).filesystem(fs)); + List projCols = ColumnProjectionUtils.getReadColumnNames(context.conf); + projColsUncompressedSize = orcReader.getRawDataSizeOfColumns(projCols); + if (fileInfo != null) { + stripes = fileInfo.stripeInfos; + fileMetaInfo = fileInfo.fileMetaInfo; + metadata = fileInfo.metadata; + types = fileInfo.types; + writerVersion = fileInfo.writerVersion; + // For multiple runs, in case sendSplitsInFooter changes + if (fileMetaInfo == null && context.footerInSplits) { + fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo(); + fileInfo.metadata = orcReader.getMetadata(); + fileInfo.types = orcReader.getTypes(); + fileInfo.writerVersion = orcReader.getWriterVersion(); } - if (!(th instanceof IOException)) { - context.notifyOnNonIOException(th); + } else { + stripes = orcReader.getStripes(); + metadata = orcReader.getMetadata(); + types = orcReader.getTypes(); + writerVersion = orcReader.getWriterVersion(); + fileMetaInfo = context.footerInSplits ? + ((ReaderImpl) orcReader).getFileMetaInfo() : null; + if (context.cacheStripeDetails) { + // Populate into cache. + Context.footerCache.put(file.getPath(), + new FileInfo(file.getModificationTime(), file.getLen(), stripes, + metadata, types, fileMetaInfo, writerVersion)); } } } @@ -943,29 +961,76 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, throws IOException { // use threads to resolve directories into splits Context context = new Context(conf); - for(Path dir: getInputPaths(conf)) { + List splits = Lists.newArrayList(); + List> pathFutures = Lists.newArrayList(); + List> splitFutures = Lists.newArrayList(); + + // multi-threaded file statuses and split strategy + for (Path dir : getInputPaths(conf)) { FileSystem fs = dir.getFileSystem(conf); - context.schedule(new FileGenerator(context, fs, dir)); - } - context.waitForTasks(); - // deal with exceptions - if (!context.errors.isEmpty()) { - List errors = - new ArrayList(context.errors.size()); - for(Throwable th: context.errors) { - if (th instanceof IOException) { - errors.add((IOException) th); + FileGenerator fileGenerator = new FileGenerator(context, fs, dir); + pathFutures.add(context.threadPool.submit(fileGenerator)); + } + + // complete path futures and schedule split generation + try { + for (Future pathFuture : pathFutures) { + SplitStrategy splitStrategy = (SplitStrategy) pathFuture.get(); + + if (isDebugEnabled) { + LOG.debug(splitStrategy); + } + + if (splitStrategy instanceof BISplitStrategy) { + splits.addAll(splitStrategy.getSplits()); + } else if (splitStrategy instanceof ETLSplitStrategy) { + List splitInfos = splitStrategy.getSplits(); + for (SplitInfo splitInfo : splitInfos) { + splitFutures.add(context.threadPool.submit(new SplitGenerator(splitInfo))); + } } else { - throw new RuntimeException("serious problem", th); + // ACIDSplitStrategy + // Generate a split for any buckets that weren't covered. + // This happens in the case where a bucket just has deltas and no base. + if (!context.deltas.isEmpty()) { + for (int b = 0; b < context.numBuckets; ++b) { + if (!context.covered[b]) { + splits.add(new OrcSplit(splitStrategy.getDirectory(), b, 0, new String[0], null, + false, false, context.deltas, -1)); + } + } + } } } - throw new InvalidInputException(errors); + + // complete split futures + for (Future splitFuture : splitFutures) { + splits.addAll((Collection) splitFuture.get()); + } + } catch (Exception e) { + cancelFutures(pathFutures); + cancelFutures(splitFutures); + throw new RuntimeException("serious problem", e); } + if (context.cacheStripeDetails) { LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/" + context.numFilesCounter.get()); } - return context.splits; + + if (isDebugEnabled) { + for (OrcSplit split : splits) { + LOG.debug(split + " projected_columns_uncompressed_size: " + + split.getProjectedColumnsUncompressedSize()); + } + } + return splits; + } + + private static void cancelFutures(List> futures) { + for (Future future : futures) { + future.cancel(true); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 84192d5bab88fb3024534d2adaeb2526cf712174..0c7dd40f5bb75ab96a4bf5fb8a4dce7e22b507b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -43,6 +43,7 @@ private boolean hasBase; private final List deltas = new ArrayList(); private OrcFile.WriterVersion writerVersion; + private long projColsUncompressedSize; static final int BASE_FLAG = 4; static final int ORIGINAL_FLAG = 2; @@ -57,13 +58,14 @@ protected OrcSplit(){ public OrcSplit(Path path, long offset, long length, String[] hosts, ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, - List deltas) { + List deltas, long projectedDataSize) { super(path, offset, length, hosts); this.fileMetaInfo = fileMetaInfo; hasFooter = this.fileMetaInfo != null; this.isOriginal = isOriginal; this.hasBase = hasBase; this.deltas.addAll(deltas); + this.projColsUncompressedSize = projectedDataSize; } @Override @@ -149,4 +151,8 @@ public boolean hasBase() { public List getDeltas() { return deltas; } + + public long getProjectedColumnsUncompressedSize() { + return projColsUncompressedSize; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 50f417b20a8a8358215cecd1ae2a62feabd73a13..172b09742e0899018b43926ccb6ee271345d16b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.io.orc; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY; - import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -36,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import com.google.common.collect.Lists; import com.google.common.collect.Sets; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index cb28b3fc74efc4e0b6da04623305306dcbd0005e..4a48f90640801a7e51ce6ed4c773cb8a2d2c97ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; @@ -98,7 +99,6 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -394,22 +394,9 @@ public void testGetInputPaths() throws Exception { OrcInputFormat.getInputPaths(conf)); } - static class TestContext extends OrcInputFormat.Context { - List queue = new ArrayList(); - - TestContext(Configuration conf) { - super(conf); - } - - @Override - public void schedule(Runnable runnable) { - queue.add(runnable); - } - } - @Test public void testFileGenerator() throws Exception { - TestContext context = new TestContext(conf); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[0]), new MockFile("mock:/a/b/part-01", 1000, new byte[0]), @@ -419,21 +406,22 @@ public void testFileGenerator() throws Exception { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a/b")); - gen.run(); - if (context.getErrors().size() > 0) { - for(Throwable th: context.getErrors()) { - System.out.println(StringUtils.stringifyException(th)); - } - throw new IOException("Errors during file generation"); - } - assertEquals(-1, context.getSchedulers()); - assertEquals(3, context.queue.size()); - assertEquals(new Path("mock:/a/b/part-00"), - ((OrcInputFormat.SplitGenerator) context.queue.get(0)).getPath()); - assertEquals(new Path("mock:/a/b/part-01"), - ((OrcInputFormat.SplitGenerator) context.queue.get(1)).getPath()); - assertEquals(new Path("mock:/a/b/part-04"), - ((OrcInputFormat.SplitGenerator) context.queue.get(2)).getPath()); + SplitStrategy splitStrategy = gen.call(); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + + conf.set("mapreduce.input.fileinputformat.split.maxsize", "500"); + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1000]), + new MockFile("mock:/a/b/part-01", 1000, new byte[1000]), + new MockFile("mock:/a/b/_part-02", 1000, new byte[1000]), + new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]), + new MockFile("mock:/a/b/part-04", 1000, new byte[1000])); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b")); + splitStrategy = gen.call(); + assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy); + } public static class MockBlock { @@ -848,11 +836,10 @@ public void testAddSplit() throws Exception { new MockBlock("host5-1", "host5-2", "host5-3"))); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.SplitGenerator splitter = - new OrcInputFormat.SplitGenerator(context, fs, + new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, - new ArrayList(), true); - splitter.createSplit(0, 200, null); - OrcSplit result = context.getResult(-1); + new ArrayList(), true)); + OrcSplit result = splitter.createSplit(0, 200, null); assertEquals(0, result.getStart()); assertEquals(200, result.getLength()); assertEquals("mock:/a/file", result.getPath().toString()); @@ -861,15 +848,13 @@ public void testAddSplit() throws Exception { assertEquals("host1-1", locs[0]); assertEquals("host1-2", locs[1]); assertEquals("host1-3", locs[2]); - splitter.createSplit(500, 600, null); - result = context.getResult(-1); + result = splitter.createSplit(500, 600, null); locs = result.getLocations(); assertEquals(3, locs.length); assertEquals("host2-1", locs[0]); assertEquals("host0", locs[1]); assertEquals("host2-3", locs[2]); - splitter.createSplit(0, 2500, null); - result = context.getResult(-1); + result = splitter.createSplit(0, 2500, null); locs = result.getLocations(); assertEquals(1, locs.length); assertEquals("host0", locs[0]); @@ -892,48 +877,36 @@ public void testSplitGenerator() throws Exception { conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.SplitGenerator splitter = - new OrcInputFormat.SplitGenerator(context, fs, + new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, - new ArrayList(), true); - splitter.run(); - if (context.getErrors().size() > 0) { - for(Throwable th: context.getErrors()) { - System.out.println(StringUtils.stringifyException(th)); - } - throw new IOException("Errors during splitting"); - } - OrcSplit result = context.getResult(0); + new ArrayList(), true)); + List results = splitter.call(); + OrcSplit result = results.get(0); assertEquals(3, result.getStart()); assertEquals(497, result.getLength()); - result = context.getResult(1); + result = results.get(1); assertEquals(500, result.getStart()); assertEquals(600, result.getLength()); - result = context.getResult(2); + result = results.get(2); assertEquals(1100, result.getStart()); assertEquals(400, result.getLength()); - result = context.getResult(3); + result = results.get(3); assertEquals(1500, result.getStart()); assertEquals(300, result.getLength()); - result = context.getResult(4); + result = results.get(4); assertEquals(1800, result.getStart()); assertEquals(200, result.getLength()); // test min = 0, max = 0 generates each stripe conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0); conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0); context = new OrcInputFormat.Context(conf); - splitter = new OrcInputFormat.SplitGenerator(context, fs, + splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList(), - true); - splitter.run(); - if (context.getErrors().size() > 0) { - for(Throwable th: context.getErrors()) { - System.out.println(StringUtils.stringifyException(th)); - } - throw new IOException("Errors during splitting"); - } + true)); + results = splitter.call(); for(int i=0; i < stripeSizes.length; ++i) { assertEquals("checking stripe " + i + " size", - stripeSizes[i], context.getResult(i).getLength()); + stripeSizes[i], results.get(i).getLength()); } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index e403ad9b34c482bf720bdec1c91057a5f53d216b..08a4e991ed3850cab9342df960cd9d3cd4c888a7 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -19,11 +19,14 @@ package org.apache.hadoop.hive.serde2; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Lists; + /** * ColumnProjectionUtils. * @@ -151,6 +154,14 @@ private static void appendReadColumnNames(StringBuilder readColumnNamesBuffer, L return result; } + public static List getReadColumnNames(Configuration conf) { + String colNames = conf.get(READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); + if (colNames != null && !colNames.isEmpty()) { + return Arrays.asList(colNames.split(",")); + } + return Lists.newArrayList(); + } + private static void setReadColumnIDConf(Configuration conf, String id) { if (id.trim().isEmpty()) { conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);