diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f207e9f..730ed14 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -516,6 +516,10 @@ HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f), + HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), + HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), + HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10), + HIVESKEWJOIN("hive.optimize.skewjoin", false), HIVECONVERTJOIN("hive.auto.convert.join", true), HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index c845f70..b3f1f7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -58,6 +60,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * A MapReduce/Hive input format for ORC files. */ @@ -69,6 +75,7 @@ private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final String MIN_SPLIT_SIZE = "mapred.min.split.size"; static final String MAX_SPLIT_SIZE = "mapred.max.split.size"; + private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024; @@ -248,11 +255,15 @@ public static SearchArgument createSarg(List types, Configuration //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit reader = OrcFile.createReader(fs, path); } else { - //We have OrcSplit, which has footer metadata cached, so used the appropriate reader + //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader //constructor OrcSplit orcSplit = (OrcSplit) fSplit; - FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); - reader = OrcFile.createReader(fs, path, fMetaInfo); + if (orcSplit.hasFooter()) { + FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); + reader = OrcFile.createReader(fs, path, fMetaInfo); + } else { + reader = OrcFile.createReader(fs, path); + } } return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength()); } @@ -311,13 +322,17 @@ private boolean isVectorMode(Configuration conf) { * the different worker threads. */ static class Context { - private final ExecutorService threadPool = Executors.newFixedThreadPool(10); + private static Cache footerCache; + private final ExecutorService threadPool; private final List splits = new ArrayList(10000); private final List errors = new ArrayList(); private final HadoopShims shims = ShimLoader.getHadoopShims(); - private final Configuration conf; private final long maxSize; private final long minSize; + private final boolean footerInSplits; + private final boolean cacheStripeDetails; + private final AtomicInteger cacheHitCounter = new AtomicInteger(0); + private final AtomicInteger numFilesCounter = new AtomicInteger(0); /** * A count of the number of threads that may create more work for the @@ -326,9 +341,24 @@ private boolean isVectorMode(Configuration conf) { private int schedulers = 0; Context(Configuration conf) { - this.conf = conf; minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); + footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS); + int cacheStripeDetailsSize = HiveConf.getIntVar(conf, + ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE); + int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); + + cacheStripeDetails = (cacheStripeDetailsSize > 0); + + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ORC_GET_SPLITS #%d").build()); + + synchronized (Context.class) { + if (footerCache == null && cacheStripeDetails) { + footerCache = CacheBuilder.newBuilder().concurrencyLevel(numThreads) + .initialCapacity(cacheStripeDetailsSize).softValues().build(); + } + } } int getSchedulers() { @@ -413,16 +443,22 @@ synchronized void waitForTasks() { @Override public void run() { try { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS); Iterator itr = context.shims.listLocatedStatus(fs, dir, hiddenFileFilter); while (itr.hasNext()) { FileStatus file = itr.next(); if (!file.isDir()) { - context.schedule(new SplitGenerator(context, fs, file)); + FileInfo fileInfo = null; + if (context.cacheStripeDetails) { + fileInfo = verifyCachedFileInfo(file); + } + context.schedule(new SplitGenerator(context, fs, file, fileInfo)); } } // mark the fact that we are done context.decrementSchedulers(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS); } catch (Throwable th) { context.decrementSchedulers(); synchronized (context.errors) { @@ -430,6 +466,34 @@ public void run() { } } } + + private FileInfo verifyCachedFileInfo(FileStatus file) { + context.numFilesCounter.incrementAndGet(); + FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath()); + if (fileInfo != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Info cached for path: " + file.getPath()); + } + if (fileInfo.modificationTime == file.getModificationTime() && fileInfo.size == file.getLen()) { + // Cached copy is valid + context.cacheHitCounter.incrementAndGet(); + return fileInfo; + } else { + // Invalidate + Context.footerCache.invalidate(file.getPath()); + LOG.info("Meta-Info for : " + file.getPath() + " changed. CachedModificationTime: " + + fileInfo.modificationTime + ", CurrentModificationTime: " + + file.getModificationTime() + + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + file.getLen()); + } + } else { + LOG.info("Info not cached for path: " + file.getPath()); + if (LOG.isDebugEnabled()) { + LOG.debug("Info not cached for path: " + file.getPath()); + } + } + return null; + } } /** @@ -442,13 +506,18 @@ public void run() { private final FileStatus file; private final long blockSize; private final BlockLocation[] locations; + private final FileInfo fileInfo; + private Iterable stripes; + private FileMetaInfo fileMetaInfo; + SplitGenerator(Context context, FileSystem fs, - FileStatus file) throws IOException { + FileStatus file, FileInfo fileInfo) throws IOException { this.context = context; this.fs = fs; this.file = file; this.blockSize = file.getBlockSize(); + this.fileInfo = fileInfo; locations = context.shims.getLocations(fs, file); } @@ -547,15 +616,15 @@ void createSplit(long offset, long length, FileMetaInfo fileMetaInfo) throws IOE public void run() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS); try { - Reader orcReader = OrcFile.createReader(fs, file.getPath()); + populateAndCacheStripeDetails(); long currentOffset = -1; long currentLength = 0; - for(StripeInformation stripe: orcReader.getStripes()) { + for(StripeInformation stripe: stripes) { // if we are working on a stripe, over the min stripe size, and // crossed a block boundary, cut the input split here. if (currentOffset != -1 && currentLength > context.minSize && (currentOffset / blockSize != stripe.getOffset() / blockSize)) { - createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); + createSplit(currentOffset, currentLength, fileMetaInfo); currentOffset = -1; } // if we aren't building a split, start a new one. @@ -566,12 +635,12 @@ public void run() { currentLength += stripe.getLength(); } if (currentLength >= context.maxSize) { - createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); + createSplit(currentOffset, currentLength, fileMetaInfo); currentOffset = -1; } } if (currentOffset != -1) { - createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo()); + createSplit(currentOffset, currentLength, fileMetaInfo); } } catch (Throwable th) { synchronized (context.errors) { @@ -580,12 +649,48 @@ public void run() { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS); } + + + + + private void populateAndCacheStripeDetails() { + try { + Reader orcReader; + boolean found = false; + if (fileInfo != null) { + found = true; + stripes = fileInfo.stripeInfos; + fileMetaInfo = fileInfo.fileMetaInfo; + // For multiple runs, in case sendSplitsInFooter changes + if (fileMetaInfo == null && context.footerInSplits) { + orcReader = OrcFile.createReader(fs, file.getPath()); + fileInfo.fileMetaInfo = orcReader.getFileMetaInfo(); + } + } + if (!found) { + orcReader = OrcFile.createReader(fs, file.getPath()); + stripes = orcReader.getStripes(); + fileMetaInfo = context.footerInSplits ? orcReader.getFileMetaInfo() : null; + if (context.cacheStripeDetails) { + // Populate into cache. + Context.footerCache.put(file.getPath(), + new FileInfo(file.getModificationTime(), file.getLen(), stripes, fileMetaInfo)); + } + } + } catch (Throwable th) { + synchronized (context.errors) { + context.errors.add(th); + } + } + } + } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // use threads to resolve directories into splits + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); Context context = new Context(job); for(Path dir: getInputPaths(job)) { FileSystem fs = dir.getFileSystem(job); @@ -607,6 +712,32 @@ public void run() { } InputSplit[] result = new InputSplit[context.splits.size()]; context.splits.toArray(result); + if (context.cacheStripeDetails) { + LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/" + + context.numFilesCounter.get()); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); return result; } -} + + /** + * FileInfo. + * + * Stores information relevant to split generation for an ORC File. + * + */ + private static class FileInfo { + long modificationTime; + long size; + Iterable stripeInfos; + FileMetaInfo fileMetaInfo; + + FileInfo(long modificationTime, long size, Iterable stripeInfos, + FileMetaInfo fileMetaInfo) { + this.modificationTime = modificationTime; + this.size = size; + this.stripeInfos = stripeInfos; + this.fileMetaInfo = fileMetaInfo; + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 65eb510..637134e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -19,6 +19,7 @@ */ public class OrcSplit extends FileSplit { private Reader.FileMetaInfo fileMetaInfo; + private boolean hasFooter; protected OrcSplit(){ //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. @@ -31,6 +32,7 @@ public OrcSplit(Path path, long offset, long length, String[] hosts, FileMetaInfo fileMetaInfo) { super(path, offset, length, hosts); this.fileMetaInfo = fileMetaInfo; + hasFooter = this.fileMetaInfo != null; } @Override @@ -38,16 +40,22 @@ public void write(DataOutput out) throws IOException { //serialize path, offset, length using FileSplit super.write(out); - //serialize FileMetaInfo fields - Text.writeString(out, fileMetaInfo.compressionType); - WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); - - //serialize FileMetaInfo field footer - ByteBuffer footerBuff = fileMetaInfo.footerBuffer; - footerBuff.reset(); - //write length of buffer - WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); - out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position()); + // Whether footer information follows. + out.writeBoolean(hasFooter); + + if (hasFooter) { + // serialize FileMetaInfo fields + Text.writeString(out, fileMetaInfo.compressionType); + WritableUtils.writeVInt(out, fileMetaInfo.bufferSize); + + // serialize FileMetaInfo field footer + ByteBuffer footerBuff = fileMetaInfo.footerBuffer; + footerBuff.reset(); + // write length of buffer + WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); + out.write(footerBuff.array(), footerBuff.position(), + footerBuff.limit() - footerBuff.position()); + } } @Override @@ -55,20 +63,27 @@ public void readFields(DataInput in) throws IOException { //deserialize path, offset, length using FileSplit super.readFields(in); - //deserialize FileMetaInfo fields - String compressionType = Text.readString(in); - int bufferSize = WritableUtils.readVInt(in); + hasFooter = in.readBoolean(); + + if (hasFooter) { + // deserialize FileMetaInfo fields + String compressionType = Text.readString(in); + int bufferSize = WritableUtils.readVInt(in); - //deserialize FileMetaInfo field footer - int footerBuffSize = WritableUtils.readVInt(in); - ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); - in.readFully(footerBuff.array(), 0, footerBuffSize); + // deserialize FileMetaInfo field footer + int footerBuffSize = WritableUtils.readVInt(in); + ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); + in.readFully(footerBuff.array(), 0, footerBuffSize); - fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff); + fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff); + } } public FileMetaInfo getFileMetaInfo(){ return fileMetaInfo; } + public boolean hasFooter() { + return hasFooter; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 3ba17d8..d904c44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -153,11 +153,15 @@ public VectorizedOrcInputFormat() { //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit reader = OrcFile.createReader(fs, path); } else { - //We have OrcSplit, which has footer metadata cached, so used the appropriate reader + //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader //constructor OrcSplit orcSplit = (OrcSplit) fSplit; - FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); - reader = OrcFile.createReader(fs, path, fMetaInfo); + if (orcSplit.hasFooter()) { + FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo(); + reader = OrcFile.createReader(fs, path, fMetaInfo); + } else { + reader = OrcFile.createReader(fs, path); + } } return new VectorizedOrcRecordReader(reader, conf, fSplit); diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index a797b9d..0bd5e79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -65,6 +65,8 @@ public static final String LOAD_HASHTABLE = "LoadHashtable"; public static final String INIT_ORC_RECORD_READER = "OrcRecordReaderInit"; public static final String CREATE_ORC_SPLITS = "OrcCreateSplits"; + public static final String ORC_GET_SPLITS = "OrcGetSplits"; + public static final String ORC_GET_BLOCK_LOCATIONS = "OrcGetBlockLocations"; protected static final ThreadLocal perfLogger = new ThreadLocal();