diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index cbe0d04c83..7af4531565 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -99,6 +99,9 @@ // Keeps track of scratch directories created for different scheme/authority private final Map fsScratchDirs = new HashMap(); + // keeps track of result cache dir for the query, later cleaned up by context cleanup + private Path fsResultCacheDirs = null; + private Configuration conf; protected int pathid = 10000; protected ExplainConfiguration explainConfig = null; @@ -343,6 +346,7 @@ protected Context(Context ctx) { this.localScratchDir = ctx.localScratchDir; this.scratchDirPermission = ctx.scratchDirPermission; this.fsScratchDirs.putAll(ctx.fsScratchDirs); + this.fsResultCacheDirs = ctx.fsResultCacheDirs; this.conf = ctx.conf; this.pathid = ctx.pathid; this.explainConfig = ctx.explainConfig; @@ -372,6 +376,14 @@ protected Context(Context ctx) { return fsScratchDirs; } + public void setFsResultCacheDirs(Path fsResultCacheDirs) { + this.fsResultCacheDirs = fsResultCacheDirs; + } + + public Path getFsResultCacheDirs() { + return this.fsResultCacheDirs; + } + public Map getLoadTableOutputMap() { return loadTableOutputMap; } @@ -625,17 +637,43 @@ private Path getExternalScratchDir(URI extURI) { return getStagingDir(new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()), !isExplainSkipExecution()); } + /** + * Remove any created scratch directories. + */ + public void removeResultCacheDir() { + if(this.fsResultCacheDirs != null) { + try { + Path p = this.fsResultCacheDirs; + FileSystem fs = p.getFileSystem(conf); + LOG.debug("Deleting result cache dir: {}", p); + fs.delete(p, true); + fs.cancelDeleteOnExit(p); + } catch (Exception e) { + LOG.warn("Error Removing result cache dir: " + + StringUtils.stringifyException(e)); + } + } + } + /** * Remove any created scratch directories. */ public void removeScratchDir() { + String resultCacheDir = null; + if(this.fsResultCacheDirs != null) { + resultCacheDir = this.fsResultCacheDirs.toUri().getPath(); + } for (Map.Entry entry : fsScratchDirs.entrySet()) { try { Path p = entry.getValue(); + if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) { + // delete only the paths which aren't result cache dir path + // because that will be taken care by removeResultCacheDir FileSystem fs = p.getFileSystem(conf); LOG.debug("Deleting scratch dir: {}", p); fs.delete(p, true); fs.cancelDeleteOnExit(p); + } } catch (Exception e) { LOG.warn("Error Removing Scratch: " + StringUtils.stringifyException(e)); @@ -774,21 +812,25 @@ public void setResDir(Path resDir) { resDirPaths = null; } - public void clear() throws IOException { + public void clear() throws IOException{ + this.clear(true); + } + + public void clear(boolean deleteResultDir) throws IOException { // First clear the other contexts created by this query for (Context subContext : rewrittenStatementContexts) { subContext.clear(); } // Then clear this context - if (resDir != null) { - try { - FileSystem fs = resDir.getFileSystem(conf); - LOG.debug("Deleting result dir: {}", resDir); - fs.delete(resDir, true); - } catch (IOException e) { - LOG.info("Context clear error: " + StringUtils.stringifyException(e)); + if (resDir != null) { + try { + FileSystem fs = resDir.getFileSystem(conf); + LOG.debug("Deleting result dir: {}", resDir); + fs.delete(resDir, true); + } catch (IOException e) { + LOG.info("Context clear error: " + StringUtils.stringifyException(e)); + } } - } if (resFile != null) { try { @@ -799,6 +841,9 @@ public void clear() throws IOException { LOG.info("Context clear error: " + StringUtils.stringifyException(e)); } } + if(deleteResultDir) { + removeResultCacheDir(); + } removeMaterializedCTEs(); removeScratchDir(); originalTracker = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index cac14a6ab8..4f14fa59c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2835,7 +2835,14 @@ private void releasePlan() { private void releaseContext() { try { if (ctx != null) { - ctx.clear(); + boolean deleteResultDir = true; + // don't let context delete result dirs and scratch dirs if result was cached + if(this.cacheUsage != null + && this.cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { + deleteResultDir = false; + + } + ctx.clear(deleteResultDir); if (ctx.getHiveLocks() != null) { hiveLocks.addAll(ctx.getHiveLocks()); ctx.setHiveLocks(null); @@ -2931,10 +2938,10 @@ public void close() { lDrvState.abort(); } releasePlan(); + releaseContext(); releaseCachedResult(); releaseFetchTask(); releaseResStream(); - releaseContext(); lDrvState.driverState = DriverState.CLOSED; } finally { lDrvState.stateLock.unlock(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 0b7166b7e8..517ead8c45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -191,6 +192,7 @@ public void setQueryTime(long queryTime) { private QueryInfo queryInfo; private FetchWork fetchWork; private Path cachedResultsPath; + private Set cachedResultPaths; // Cache administration private long size; @@ -275,8 +277,9 @@ private String getQueryText() { public FetchWork getFetchWork() { // FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork - FetchWork fetch = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + FetchWork fetch = new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); fetch.setCachedResult(true); + fetch.setFilesToFetch(this.cachedResultPaths); return fetch; } @@ -409,6 +412,10 @@ public static QueryResultsCache getInstance() { return instance; } + public Path getCacheDirPath() { + return cacheDirPath; + } + /** * Check if the cache contains an entry for the requested LookupInfo. * @param request @@ -517,30 +524,26 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId * @return */ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { - String queryText = cacheEntry.getQueryText(); - boolean dataDirMoved = false; Path queryResultsPath = null; Path cachedResultsPath = null; try { - boolean requiresMove = true; + // if we are here file sink op should have created files to fetch from + assert(fetchWork.getFilesToFetch() != null ); + + boolean requiresCaching = true; queryResultsPath = fetchWork.getTblDir(); FileSystem resultsFs = queryResultsPath.getFileSystem(conf); - long resultSize; - if (resultsFs.exists(queryResultsPath)) { - ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); - resultSize = cs.getLength(); - } else { - // No actual result directory, no need to move anything. - cachedResultsPath = zeroRowsPath; - // Even if there are no results to move, at least check that we have permission - // to check the existence of zeroRowsPath, or the read using the cache will fail. - // A failure here will cause this query to not be added to the cache. - FileSystem cacheFs = cachedResultsPath.getFileSystem(conf); - boolean fakePathExists = cacheFs.exists(zeroRowsPath); - - resultSize = 0; - requiresMove = false; + + long resultSize = 0; + for(FileStatus fs:fetchWork.getFilesToFetch()) { + if(resultsFs.exists(fs.getPath())) { + resultSize += fs.getLen(); + } else { + // No actual result directory, no need to cache anything. + requiresCaching = false; + break; + } } if (!shouldEntryBeAdded(cacheEntry, resultSize)) { @@ -555,20 +558,22 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } - if (requiresMove) { - // Move the query results to the query cache directory. - cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); - dataDirMoved = true; + if (requiresCaching) { + cacheEntry.cachedResultPaths = new HashSet<>(); + for(FileStatus fs:fetchWork.getFilesToFetch()) { + cacheEntry.cachedResultPaths.add(fs); + } + LOG.info("Cached query result paths located at {} (size {}) for query '{}'", + queryResultsPath, resultSize, cacheEntry.getQueryText()); } - LOG.info("Moved query results from {} to {} (size {}) for query '{}'", - queryResultsPath, cachedResultsPath, resultSize, queryText); // Create a new FetchWork to reference the new cache location. FetchWork fetchWorkForCache = - new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); fetchWorkForCache.setCachedResult(true); + fetchWorkForCache.setFilesToFetch(fetchWork.getFilesToFetch()); cacheEntry.fetchWork = fetchWorkForCache; - cacheEntry.cachedResultsPath = cachedResultsPath; + //cacheEntry.cachedResultsPath = cachedResultsPath; cacheEntry.size = resultSize; this.cacheSize += resultSize; @@ -585,23 +590,10 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { incrementMetric(MetricsConstant.QC_VALID_ENTRIES); incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED); } catch (Exception err) { + String queryText = cacheEntry.getQueryText(); LOG.error("Failed to create cache entry for query results for query: " + queryText, err); - - if (dataDirMoved) { - // If data was moved from original location to cache directory, we need to move it back! - LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath); - try { - FileSystem fs = cachedResultsPath.getFileSystem(conf); - fs.rename(cachedResultsPath, queryResultsPath); - cacheEntry.size = 0; - cacheEntry.cachedResultsPath = null; - } catch (Exception err2) { - String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText; - LOG.error(errMsg); - throw new RuntimeException(errMsg); - } - } - + cacheEntry.size = 0; + cacheEntry.cachedResultsPath = null; // Invalidate the entry. Rely on query cleanup to remove from lookup. cacheEntry.invalidate(); return false; @@ -783,23 +775,6 @@ private boolean shouldEntryBeAdded(CacheEntry entry, long size) { return true; } - private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException { - String dirName = UUID.randomUUID().toString(); - Path cachedResultsPath = new Path(cacheDirPath, dirName); - FileSystem fs = cachedResultsPath.getFileSystem(conf); - try { - boolean resultsMoved = Hive.moveFile(conf, queryResultsPath, cachedResultsPath, false, false, false); - if (!resultsMoved) { - throw new IOException("Failed to move " + queryResultsPath + " to " + cachedResultsPath); - } - } catch (IOException err) { - throw err; - } catch (Exception err) { - throw new IOException("Error moving " + queryResultsPath + " to " + cachedResultsPath, err); - } - return cachedResultsPath; - } - private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) { if (maxCacheSize >= 0) { return (cacheSize + size) <= maxCacheSize; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1df6094dd2..62803f2040 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1486,8 +1486,7 @@ public static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) // * query cache is disabled // * if it is select query if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null - && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez") - && !HiveConf.getBoolVar(hConf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)){ + && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){ return true; } return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 767147b000..d0e60e7fe7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -42,6 +42,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -7176,6 +7177,23 @@ private ExprNodeDesc getNotNullConstraintExpr(Table targetTable, Operator input, return currUDF; } + private Path getDestinationFilePath(final String destinationFile, boolean isMmTable) + throws SemanticException { + if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) { + assert (!isMmTable); + QueryResultsCache instance = QueryResultsCache.getInstance(); + // QueryResultsCache should have been initialized by now + if (instance != null) { + Path resultCacheTopDir = instance.getCacheDirPath(); + String dirName = UUID.randomUUID().toString(); + Path resultDir = new Path(resultCacheTopDir, dirName); + this.ctx.setFsResultCacheDirs(resultDir); + return resultDir; + } + } + return new Path(destinationFile); + } + @SuppressWarnings("nls") protected Operator genFileSinkPlan(String dest, QB qb, Operator input) throws SemanticException { @@ -7440,7 +7458,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) isLocal = true; // fall through case QBMetaData.DEST_DFS_FILE: { - destinationPath = new Path(qbm.getDestFileForAlias(dest)); + destinationPath = getDestinationFilePath(qbm.getDestFileForAlias(dest), isMmTable); // CTAS case: the file output format and serde are defined by the create // table command rather than taking the default value @@ -15063,6 +15081,9 @@ private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry, boolean ne * Some initial checks for a query to see if we can look this query up in the results cache. */ private boolean queryTypeCanUseCache() { + if(this.qb == null || this.qb.getParseInfo() == null) { + return false; + } if (this instanceof ColumnStatsSemanticAnalyzer) { // Column stats generates "select compute_stats() .." queries. // Disable caching for these. @@ -15073,13 +15094,13 @@ private boolean queryTypeCanUseCache() { return false; } - if (qb.getParseInfo().isAnalyzeCommand()) { - return false; - } + if (qb.getParseInfo().isAnalyzeCommand()) { + return false; + } - if (qb.getParseInfo().hasInsertTables()) { - return false; - } + if (qb.getParseInfo().hasInsertTables()) { + return false; + } // HIVE-19096 - disable for explain analyze if (ctx.getExplainAnalyze() != null) { diff --git a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out index ed6af87bcc..54a34517e7 100644 --- a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out +++ b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out @@ -3,12 +3,12 @@ PREHOOK: query: explain select count(*) from src a join src b on (a.key = b.key) PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: explain select count(*) from src a join src b on (a.key = b.key) POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 @@ -107,11 +107,11 @@ STAGE PLANS: PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### 1028 test.comment="Cache should be used for this query" PREHOOK: query: explain