diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 12a022c590..75339a8614 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2044,6 +2044,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "(input, output formats, serde etc.), when classloader throws ClassNotFoundException, as a fallback this\n" + "shade prefix will be used before class reference and retried."), + HIVE_ORC_ACID_DIR_CACHE_ENABLED("hive.orc.acid.dir.cache.enabled", + false, "Whether to enable dir cache for ACID tables. Enabled only " + + "for BI strategy mode." ), HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false, "Whether to enable using file metadata cache in metastore for ORC file footers."), HIVE_ORC_MS_FOOTER_CACHE_PPD("hive.orc.splits.ms.footer.cache.ppd.enabled", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 76984abd0a..1c2e63f8f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -45,6 +45,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -637,6 +639,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private final SearchArgument sarg; private final AcidOperationalProperties acidOperationalProperties; private final boolean isAcid; + private final boolean dirCacheEnabled; private final boolean isVectorMode; Context(Configuration conf) throws IOException { @@ -652,6 +655,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, throws IOException { this.conf = conf; this.isAcid = AcidUtils.isFullAcidScan(conf); + this.dirCacheEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_ACID_DIR_CACHE_ENABLED); this.isVectorMode = Utilities.getIsVectorized(conf); this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); this.sarg = ConvertAstToSearchArg.createFromConf(conf); @@ -731,6 +735,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); LOG.info("Context:: " + "isAcid: {} " + + "dirCacheEnabled: {} " + "isVectorMode: {} " + "sarg: {} " + "minSplitSize: {} " + @@ -746,6 +751,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, "isTransactionalTable: {} " + "txnProperties: {} ", isAcid, + dirCacheEnabled, isVectorMode, sarg, minSize, @@ -1208,6 +1214,13 @@ public String toString() { * To parallelize file generator use "mapreduce.input.fileinputformat.list-status.num-threads" */ static final class FileGenerator implements Callable { + + private static final Cache dirCache = + CacheBuilder.newBuilder() + .expireAfterWrite(30, TimeUnit.MINUTES) + .softValues() + .build(); + private final Context context; private final FileSystem fs; /** @@ -1252,6 +1265,51 @@ public AcidDirInfo run() throws Exception { } } + private AcidUtils.Directory getAcidState() throws IOException { + return AcidUtils.getAcidState(fs, dir, context.conf, context.writeIdList, + useFileIds, true, null, true); + } + + private void addToDirCache(String key, AcidUtils.Directory dirInfo) { + // For now, cache when there is base and no-delta files. + if (context.dirCacheEnabled && dirInfo != null && dirInfo.getBaseDirectory() != null + && dirInfo.getCurrentDirectories().isEmpty()) { + dirCache.put(key, dirInfo); + } + } + + private AcidUtils.Directory getAcidStateFromCache() throws IOException { + String txnString = context.conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); + ValidWriteIdList validWriteIdList + = (txnString == null) ? new ValidReaderWriteIdList() + : new ValidReaderWriteIdList(txnString); + + // takes care of different base_x (e.g multiple insert-overwrite) + String key = validWriteIdList.getHighWatermark() + "_" + dir.toString(); + + // in case of open/aborted txns, recompute dirInfo + long[] exceptions = validWriteIdList.getInvalidWriteIds(); + boolean recompute = (exceptions != null && exceptions.length > 0); + + if (recompute) { + dirCache.invalidate(key); + AcidUtils.Directory dirInfo = getAcidState(); + addToDirCache(key, dirInfo); + return dirInfo; + } + + AcidUtils.Directory dirInfo = dirCache.getIfPresent(key); + + if (dirInfo == null) { + dirInfo = getAcidState(); + addToDirCache(key, dirInfo); + } else { + LOG.debug("From cache: key=" + key + ", size=" + dirCache.size()); + } + return dirInfo; + } + + private AcidDirInfo callInternal() throws IOException { if (context.acidOperationalProperties != null && context.acidOperationalProperties.isInsertOnly()) { @@ -1272,8 +1330,12 @@ private AcidDirInfo callInternal() throws IOException { Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true); + + // Cache is for ACID tables with BI strategy. + AcidUtils.Directory dirInfo = + (context.dirCacheEnabled && context.splitStrategyKind == SplitStrategyKind.BI && context.isAcid) ? + getAcidStateFromCache() : getAcidState(); + // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) {