diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index cbe0d04c83..e8475b77f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -780,7 +780,8 @@ public void clear() throws IOException { subContext.clear(); } // Then clear this context - if (resDir != null) { + // TODO: should only be deleted if results weren't cached + /*if (resDir != null) { try { FileSystem fs = resDir.getFileSystem(conf); LOG.debug("Deleting result dir: {}", resDir); @@ -788,7 +789,7 @@ public void clear() throws IOException { } catch (IOException e) { LOG.info("Context clear error: " + StringUtils.stringifyException(e)); } - } + } */ if (resFile != null) { try { @@ -800,7 +801,7 @@ public void clear() throws IOException { } } removeMaterializedCTEs(); - removeScratchDir(); + //removeScratchDir(); originalTracker = null; setNeedLockMgr(false); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 0b7166b7e8..5ae28108cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -191,6 +192,7 @@ public void setQueryTime(long queryTime) { private QueryInfo queryInfo; private FetchWork fetchWork; private Path cachedResultsPath; + private Set cachedResultPaths; // Cache administration private long size; @@ -275,8 +277,9 @@ private String getQueryText() { public FetchWork getFetchWork() { // FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork - FetchWork fetch = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + FetchWork fetch = new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); fetch.setCachedResult(true); + fetch.setFilesToFetch(this.cachedResultPaths); return fetch; } @@ -409,6 +412,10 @@ public static QueryResultsCache getInstance() { return instance; } + public Path getCacheDirPath() { + return cacheDirPath; + } + /** * Check if the cache contains an entry for the requested LookupInfo. * @param request @@ -523,24 +530,29 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { Path cachedResultsPath = null; try { + // if we are here file sink op should have created files to fetch from + assert(fetchWork.getFilesToFetch() != null ); + boolean requiresMove = true; - queryResultsPath = fetchWork.getTblDir(); + queryResultsPath = Utilities.toTempPath(fetchWork.getTblDir()); FileSystem resultsFs = queryResultsPath.getFileSystem(conf); - long resultSize; - if (resultsFs.exists(queryResultsPath)) { - ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); - resultSize = cs.getLength(); - } else { - // No actual result directory, no need to move anything. - cachedResultsPath = zeroRowsPath; - // Even if there are no results to move, at least check that we have permission - // to check the existence of zeroRowsPath, or the read using the cache will fail. - // A failure here will cause this query to not be added to the cache. - FileSystem cacheFs = cachedResultsPath.getFileSystem(conf); - boolean fakePathExists = cacheFs.exists(zeroRowsPath); - - resultSize = 0; - requiresMove = false; + + long resultSize = 0; + for(FileStatus fs:fetchWork.getFilesToFetch()) { + if(resultsFs.exists(fs.getPath())) { + resultSize += fs.getLen(); + } else { + // No actual result directory, no need to move anything. + cachedResultsPath = zeroRowsPath; + // Even if there are no results to move, at least check that we have permission + // to check the existence of zeroRowsPath, or the read using the cache will fail. + // A failure here will cause this query to not be added to the cache. + FileSystem cacheFs = cachedResultsPath.getFileSystem(conf); + boolean fakePathExists = cacheFs.exists(zeroRowsPath); + resultSize = 0; + requiresMove = false; + break; + } } if (!shouldEntryBeAdded(cacheEntry, resultSize)) { @@ -557,18 +569,26 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { if (requiresMove) { // Move the query results to the query cache directory. - cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); + //Set queryResultFiles = new HashSet<>(); + //if(fetchWork.getFilesToFetch() != null && !fetchWork.getFilesToFetch().isEmpty()) { + cacheEntry.cachedResultPaths = new HashSet<>(); + for(FileStatus fs:fetchWork.getFilesToFetch()) { + cacheEntry.cachedResultPaths.add(fs); + } + //} + //cachedResultsPath = moveResultsToCacheDirectory(queryResultFiles); dataDirMoved = true; } - LOG.info("Moved query results from {} to {} (size {}) for query '{}'", - queryResultsPath, cachedResultsPath, resultSize, queryText); + //LOG.info("Moved query results from {} to {} (size {}) for query '{}'", + // queryResultsPath, cachedResultsPath, resultSize, queryText); // Create a new FetchWork to reference the new cache location. FetchWork fetchWorkForCache = - new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit()); fetchWorkForCache.setCachedResult(true); + fetchWorkForCache.setFilesToFetch(fetchWork.getFilesToFetch()); cacheEntry.fetchWork = fetchWorkForCache; - cacheEntry.cachedResultsPath = cachedResultsPath; + //cacheEntry.cachedResultsPath = cachedResultsPath; cacheEntry.size = resultSize; this.cacheSize += resultSize; @@ -783,14 +803,17 @@ private boolean shouldEntryBeAdded(CacheEntry entry, long size) { return true; } - private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException { + private Path moveResultsToCacheDirectory(Set queryResultsPath) throws IOException { String dirName = UUID.randomUUID().toString(); Path cachedResultsPath = new Path(cacheDirPath, dirName); FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.mkdirs(cachedResultsPath); try { - boolean resultsMoved = Hive.moveFile(conf, queryResultsPath, cachedResultsPath, false, false, false); - if (!resultsMoved) { - throw new IOException("Failed to move " + queryResultsPath + " to " + cachedResultsPath); + for(Path resultPath:queryResultsPath) { + boolean resultsMoved = Hive.moveFile(conf, resultPath, cachedResultsPath, false, false, false); + if (!resultsMoved) { + throw new IOException("Failed to move " + queryResultsPath + " to " + cachedResultsPath); + } } } catch (IOException err) { throw err; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1df6094dd2..62803f2040 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1486,8 +1486,7 @@ public static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) // * query cache is disabled // * if it is select query if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null - && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez") - && !HiveConf.getBoolVar(hConf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)){ + && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){ return true; } return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6252013335..e7cd0f1e2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -42,6 +42,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -2339,7 +2340,7 @@ private void getMetaData(QB qb, ReadEntity parentInput) } else { // This is the only place where isQuery is set to true; it defaults to false. qb.setIsQuery(true); - Path stagingPath = getStagingDirectoryPathname(qb); + Path stagingPath = getStagingPath(qb); fname = stagingPath.toString(); ctx.setResDir(stagingPath); } @@ -7175,6 +7176,66 @@ private ExprNodeDesc getNotNullConstraintExpr(Table targetTable, Operator input, return currUDF; } + private Path getStagingPath(final QB qb) throws HiveException{ + Path queryPath = null; + if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) { + try { + // In case this has not been initialized elsewhere. + QueryResultsCache.initialize(conf); + } catch (Exception err) { + throw new IllegalStateException(err); + } + QueryResultsCache instance = QueryResultsCache.getInstance(); + + // QueryResultsCache should have been initialized by now + //assert (instance != null); + Path resultCacheTopDir = instance.getCacheDirPath(); + String dirName = UUID.randomUUID().toString(); + queryPath = new Path(resultCacheTopDir, dirName); + } else { + queryPath = getStagingDirectoryPathname(qb); + } + return queryPath; + } + + private Path getQueryTempPath(final Path destinationPath, boolean isMmTable) + throws SemanticException { + Path fdesinationPath = destinationPath; + Path queryTmpdir = null; + try { + if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) { + assert (!isMmTable); + QueryResultsCache instance = QueryResultsCache.getInstance(); + + // QueryResultsCache should have been initialized by now + assert (instance != null); + Path resultCacheTopDir = instance.getCacheDirPath(); + String dirName = UUID.randomUUID().toString(); + fdesinationPath = new Path(resultCacheTopDir, dirName); + } + + //FileSystem fs = queryTmpdir.getFileSystem(conf); + //TODO: better catch handling + //FileUtils.mkdir(fs, queryTmpdir, conf); + //if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + // Utilities.FILE_OP_LOGGER.trace("Setting query directory to result cache dir: " + queryTmpdir); + //} + //} else { + // otherwise write to the file system implied by the directory + // no copy is required. we may want to revisit this policy in future + Path qPath = FileUtils.makeQualified(fdesinationPath, conf); + queryTmpdir = isMmTable ? qPath : ctx.getTempDirForFinalJobPath(qPath); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Setting query directory " + queryTmpdir + + " from " + destinationPath + " (" + isMmTable + ")"); + } + } catch (Exception e) { + throw new SemanticException("Error creating temporary folder on: " + + destinationPath, e); + } + return queryTmpdir; + } + @SuppressWarnings("nls") protected Operator genFileSinkPlan(String dest, QB qb, Operator input) throws SemanticException { @@ -15051,6 +15112,9 @@ private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry, boolean ne * Some initial checks for a query to see if we can look this query up in the results cache. */ private boolean queryTypeCanUseCache() { + if(this.qb == null || this.qb.getParseInfo() == null) { + return false; + } if (this instanceof ColumnStatsSemanticAnalyzer) { // Column stats generates "select compute_stats() .." queries. // Disable caching for these. @@ -15061,13 +15125,13 @@ private boolean queryTypeCanUseCache() { return false; } - if (qb.getParseInfo().isAnalyzeCommand()) { - return false; - } + if (qb.getParseInfo().isAnalyzeCommand()) { + return false; + } - if (qb.getParseInfo().hasInsertTables()) { - return false; - } + if (qb.getParseInfo().hasInsertTables()) { + return false; + } // HIVE-19096 - disable for explain analyze if (ctx.getExplainAnalyze() != null) {