diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 12a022c590..85e540d3f5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2659,6 +2659,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "are not hidden by the INSERT OVERWRITE."), HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true, "Whether Hive supports transactional stats (accurate stats for transactional tables)"), + HIVE_TXN_ACID_DIR_CACHE_ENABLED("hive.txn.acid.dir.cache.enabled", + false, "Whether to enable dir cache for ACID tables."), HIVE_TXN_READONLY_ENABLED("hive.txn.readonly.enabled", false, "Enables read-only transaction classification and related optimizations"), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2f5ec5270c..cbfc8ba9aa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -35,10 +35,13 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import com.google.common.base.Strings; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -163,6 +166,12 @@ public boolean accept(Path path) { public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + private static final Cache + dirCache = CacheBuilder.newBuilder() + .expireAfterWrite(30, TimeUnit.MINUTES) + .softValues() + .build(); + /** * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 * (Unless via Load Data statement) @@ -3075,4 +3084,101 @@ public static TxnType getTxnType(Configuration conf, ASTNode tree) { astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ? TxnType.READ_ONLY : TxnType.DEFAULT; } + + /** + * Tries to get directory details from cache. For now, cache is valid only + * when base directory is available and no deltas are present. This should + * be used only in BI strategy and for ACID tables. + * + * @param fileSystem file system instance + * @param candidateDirectory the partition directory to analyze + * @param conf the configuration + * @param writeIdList the list of write ids that we are reading + * @param useFileIds + * @param ignoreEmptyFiles + * @param tblproperties + * @param generateDirSnapshots + * @return directory state + * @throws IOException on errors + */ + public static Directory getAcidStateFromCache(FileSystem fileSystem, + Path candidateDirectory, Configuration conf, + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, + Map tblproperties, boolean generateDirSnapshots) throws IOException { + + boolean dirCacheEnabled = HiveConf.getBoolVar(conf, + ConfVars.HIVE_TXN_ACID_DIR_CACHE_ENABLED); + + if (!dirCacheEnabled) { + LOG.debug("dirCache is not enabled"); + return getAcidState(fileSystem, candidateDirectory, conf, writeIdList, + useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots); + } + + /* + * Cache for single case, where base directory is there without deltas. + * In case of changes, cache would get invalidated based on + * open/aborted list. ValidWriteIdList::getTableName ensures db::tableName. + */ + //dbName + tableName + dir + String key = writeIdList.getTableName() + "_" + candidateDirectory.toString(); + DirInfoValue value = dirCache.getIfPresent(key); + + // in case of open/aborted txns, recompute dirInfo + long[] exceptions = writeIdList.getInvalidWriteIds(); + boolean recompute = (exceptions != null && exceptions.length > 0); + + if (recompute) { + LOG.debug("invalidating cache entry for key: {}", key); + dirCache.invalidate(key); + value = null; + } + + if (value != null) { + // double check writeIds + if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) { + if (LOG.isDebugEnabled()) { + LOG.debug("writeIdList: {} from cache: {} is not matching " + + "for key: {}", writeIdList.writeToString(), + value.getTxnString(), key); + } + recompute = true; + } + } + + // compute and add to cache + if (recompute || (value == null)) { + Directory dirInfo = getAcidState(fileSystem, candidateDirectory, conf, + writeIdList, useFileIds, ignoreEmptyFiles, tblproperties, + generateDirSnapshots); + value = new DirInfoValue(writeIdList.writeToString(), dirInfo); + + if (value.dirInfo != null + && value.dirInfo.getBaseDirectory() != null + && value.dirInfo.getCurrentDirectories().isEmpty()) { + dirCache.put(key, value); + } + } else { + LOG.debug("Got {} from cache, cache size: {}", key, dirCache.size()); + } + return value.getDirInfo(); + } + + static class DirInfoValue { + private String txnString; + private AcidUtils.Directory dirInfo; + + DirInfoValue(String txnString, AcidUtils.Directory dirInfo) { + this.txnString = txnString; + this.dirInfo = dirInfo; + } + + String getTxnString() { + return txnString; + } + + AcidUtils.Directory getDirInfo() { + return dirInfo; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 76984abd0a..1bd1a510fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -45,6 +45,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -1252,6 +1254,16 @@ public AcidDirInfo run() throws Exception { } } + private AcidUtils.Directory getAcidState() throws IOException { + if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) { + return AcidUtils.getAcidStateFromCache(fs, dir, context.conf, + context.writeIdList, useFileIds, true, null, true); + } else { + return AcidUtils.getAcidState(fs, dir, context.conf, context.writeIdList, + useFileIds, true, null, true); + } + } + private AcidDirInfo callInternal() throws IOException { if (context.acidOperationalProperties != null && context.acidOperationalProperties.isInsertOnly()) { @@ -1272,8 +1284,9 @@ private AcidDirInfo callInternal() throws IOException { Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true); + + AcidUtils.Directory dirInfo = getAcidState(); + // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) {