diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 04b8c4b..21647fc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3719,6 +3719,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If the query results cache is enabled. This will keep results of previously executed queries " + "to be reused if the same query is executed again."), + HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS("hive.query.results.cache.wait.for.pending.results", true, + "Should a query wait for the pending results of an already running query, " + + "in order to use the cached result when it becomes ready"), + HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory", "/tmp/hive/_resultscache_", "Location of the query results cache directory. Temporary results from queries " + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d789ed0..eefcaea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1860,7 +1860,23 @@ private void useFetchFromCache(CacheEntry cacheEntry) { cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry); } - private void checkCacheUsage() throws Exception { + private void preExecutionCacheActions() throws Exception { + if (cacheUsage != null) { + if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + plan.getFetchTask() != null) { + // The results of this query execution might be cacheable. + // Add a placeholder entry in the cache so other queries know this result is pending. + CacheEntry pendingCacheEntry = + QueryResultsCache.getInstance().addToCache(cacheUsage.getQueryInfo()); + if (pendingCacheEntry != null) { + // Update cacheUsage to reference the pending entry. + this.cacheUsage.setCacheEntry(pendingCacheEntry); + } + } + } + } + + private void postExecutionCacheActions() throws Exception { if (cacheUsage != null) { if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { // Using a previously cached result. @@ -1868,22 +1884,22 @@ private void checkCacheUsage() throws Exception { // Reader count already incremented during cache lookup. // Save to usedCacheEntry to ensure reader is released after query. - usedCacheEntry = cacheEntry; + this.usedCacheEntry = cacheEntry; } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + cacheUsage.getCacheEntry() != null && plan.getFetchTask() != null) { - // The query could not be resolved using the cache, but the query results - // can be added to the cache for future queries to use. + // Save results to the cache for future queries to use. PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); - CacheEntry savedCacheEntry = - QueryResultsCache.getInstance().addToCache( - cacheUsage.getQueryInfo(), - plan.getFetchTask().getWork()); - if (savedCacheEntry != null) { - useFetchFromCache(savedCacheEntry); - // addToCache() already increments the reader count. Set usedCacheEntry so it gets released. - usedCacheEntry = savedCacheEntry; + boolean savedToCache = QueryResultsCache.getInstance().setEntryValid( + cacheUsage.getCacheEntry(), + plan.getFetchTask().getWork()); + LOG.info("savedToCache: {}", savedToCache); + if (savedToCache) { + useFetchFromCache(cacheUsage.getCacheEntry()); + // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released. + this.usedCacheEntry = cacheUsage.getCacheEntry(); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); @@ -2000,6 +2016,8 @@ private void execute() throws CommandProcessorResponse { } } + preExecutionCacheActions(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up while (driverCxt.isRunning()) { @@ -2099,7 +2117,7 @@ private void execute() throws CommandProcessorResponse { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); - checkCacheUsage(); + postExecutionCacheActions(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value @@ -2499,13 +2517,31 @@ private void releaseFetchTask() { } } + private boolean hasBadCacheAttempt() { + // Check if the query results were cacheable, and created a pending cache entry. + // If we successfully saved the results, the usage would have changed to QUERY_USING_CACHE. + return (cacheUsage != null && + cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + cacheUsage.getCacheEntry() != null); + } + private void releaseCachedResult() { // Assumes the reader count has been incremented automatically by the results cache by either // lookup or creating the cache entry. if (usedCacheEntry != null) { usedCacheEntry.releaseReader(); usedCacheEntry = null; + } else if (hasBadCacheAttempt()) { + // This query create a pending cache entry but it was never saved with real results, cleanup. + // This step is required, as there may be queries waiting on this pending cache entry. + // Removing/invalidating this entry will notify the waiters that this entry cannot be used. + try { + QueryResultsCache.getInstance().removeEntry(cacheUsage.getCacheEntry()); + } catch (Exception err) { + LOG.error("Error removing failed cache entry " + cacheUsage.getCacheEntry(), err); + } } + cacheUsage = null; } // Close and release resources within a running query process. Since it runs under diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 88a056b..f9fd115 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -155,6 +155,10 @@ public void setInputs(Set inputs) { } } + public enum CacheEntryStatus { + VALID, INVALID, PENDING + } + public static class CacheEntry { private QueryInfo queryInfo; private FetchWork fetchWork; @@ -163,13 +167,9 @@ public void setInputs(Set inputs) { // Cache administration private long createTime; private long size; - private AtomicBoolean valid = new AtomicBoolean(false); private AtomicInteger readers = new AtomicInteger(0); private ScheduledFuture invalidationFuture = null; - - public boolean isValid() { - return valid.get(); - } + private volatile CacheEntryStatus status = CacheEntryStatus.PENDING; public void releaseReader() { int readerCount = 0; @@ -177,14 +177,13 @@ public void releaseReader() { readerCount = readers.decrementAndGet(); } LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount); - Preconditions.checkState(readerCount >= 0); cleanupIfNeeded(); } public String toString() { return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText() - + "], location: " + cachedResultsPath + + "], status: " + status + ", location: " + cachedResultsPath + ", size: " + size; } @@ -192,13 +191,12 @@ public boolean addReader() { boolean added = false; int readerCount = 0; synchronized (this) { - if (valid.get()) { + if (status == CacheEntryStatus.VALID) { readerCount = readers.incrementAndGet(); added = true; } } - Preconditions.checkState(readerCount > 0); - LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount); + LOG.debug("addReader: entry: {}, readerCount: {}, added: {}", this, readerCount, added); return added; } @@ -207,32 +205,36 @@ private int numReaders() { } private void invalidate() { - boolean wasValid = setValidity(false); - - if (wasValid) { - LOG.info("Invalidated cache entry: {}", this); - + LOG.info("Invalidating cache entry: {}", this); + CacheEntryStatus prevStatus = setStatus(CacheEntryStatus.INVALID); + if (prevStatus == CacheEntryStatus.VALID) { if (invalidationFuture != null) { // The cache entry has just been invalidated, no need for the scheduled invalidation. invalidationFuture.cancel(false); } cleanupIfNeeded(); + } else if (prevStatus == CacheEntryStatus.PENDING) { + // Need to notify any queries waiting on the change from pending status. + synchronized (this) { + this.notifyAll(); + } } } - /** - * Set the validity, returning the previous validity value. - * @param valid - * @return - */ - private boolean setValidity(boolean valid) { - synchronized(this) { - return this.valid.getAndSet(valid); + public CacheEntryStatus getStatus() { + return status; + } + + private CacheEntryStatus setStatus(CacheEntryStatus newStatus) { + synchronized (this) { + CacheEntryStatus oldStatus = status; + status = newStatus; + return oldStatus; } } private void cleanupIfNeeded() { - if (!isValid() && readers.get() <= 0) { + if (status == CacheEntryStatus.INVALID && readers.get() <= 0) { QueryResultsCache.cleanupEntry(this); } } @@ -252,6 +254,37 @@ public QueryInfo getQueryInfo() { public Path getCachedResultsPath() { return cachedResultsPath; } + + /** + * Wait for the cache entry to go from PENDING to VALID status. + * @return true if the cache entry successfully changed to VALID status, + * false if the status changes from PENDING to INVALID + */ + public boolean waitForValidStatus() { + LOG.info("Waiting on pending cacheEntry"); + long timeout = 1000; + + while (true) { + try { + switch (status) { + case VALID: + return true; + case INVALID: + return false; + case PENDING: + // Status has not changed, continue waiting. + break; + } + + synchronized (this) { + this.wait(timeout); + } + } catch (InterruptedException err) { + Thread.currentThread().interrupt(); + return false; + } + } + } } // Allow lookup by query string @@ -264,6 +297,7 @@ public Path getCachedResultsPath() { private final HiveConf conf; private Path cacheDirPath; + private Path zeroRowsPath; private long cacheSize = 0; private long maxCacheSize; private long maxEntrySize; @@ -284,6 +318,9 @@ private QueryResultsCache(HiveConf configuration) throws IOException { FsPermission fsPermission = new FsPermission("700"); fs.mkdirs(cacheDirPath, fsPermission); + // Create non-existent path for 0-row results + zeroRowsPath = new Path(cacheDirPath, "dummy_zero_rows"); + // Results cache directory should be cleaned up at process termination. fs.deleteOnExit(cacheDirPath); @@ -324,7 +361,7 @@ public static QueryResultsCache getInstance() { * using CacheEntry.releaseReader(). * @return The cached result if there is a match in the cache, or null if no match is found. */ - public CacheEntry lookup(LookupInfo request, boolean addReader) { + public CacheEntry lookup(LookupInfo request) { CacheEntry result = null; LOG.debug("QueryResultsCache lookup for query: {}", request.queryText); @@ -334,27 +371,26 @@ public CacheEntry lookup(LookupInfo request, boolean addReader) { readLock.lock(); Set candidates = queryMap.get(request.queryText); if (candidates != null) { + CacheEntry pendingResult = null; for (CacheEntry candidate : candidates) { if (entryMatches(request, candidate)) { - result = candidate; - break; + CacheEntryStatus entryStatus = candidate.status; + if (entryStatus == CacheEntryStatus.VALID) { + result = candidate; + break; + } else if (entryStatus == CacheEntryStatus.PENDING && pendingResult == null) { + pendingResult = candidate; + } } } + // Try to find valid entry, but settle for pending entry if that is all we have. + if (result == null && pendingResult != null) { + result = pendingResult; + } + if (result != null) { lru.get(result); // Update LRU - - if (!result.isValid()) { - // Entry is in the cache, but not valid. - // This can happen when the entry is first added, before the data has been moved - // to the results cache directory. We cannot use this entry yet. - result = null; - } else { - if (addReader) { - // Caller will need to be responsible for releasing the reader count. - result.addReader(); - } - } } } } finally { @@ -367,111 +403,124 @@ public CacheEntry lookup(LookupInfo request, boolean addReader) { } /** - * Add an entry to the query results cache. + * Add an entry to the cache. + * The new entry will be in PENDING state and not usable setEntryValid() is called on the entry. + * @param queryInfo + * @return + */ + public CacheEntry addToCache(QueryInfo queryInfo) { + // Create placeholder entry with PENDING state. + String queryText = queryInfo.getLookupInfo().getQueryText(); + CacheEntry addedEntry = new CacheEntry(); + addedEntry.queryInfo = queryInfo; + + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + + LOG.info("Adding placeholder cache entry for query '{}'", queryText); + + // Add the entry to the cache structures while under write lock. + Set entriesForQuery = queryMap.get(queryText); + if (entriesForQuery == null) { + entriesForQuery = new HashSet(); + queryMap.put(queryText, entriesForQuery); + } + entriesForQuery.add(addedEntry); + lru.put(addedEntry, addedEntry); + } finally { + writeLock.unlock(); + } + + return addedEntry; + } + + /** + * Updates a pending cache entry with a FetchWork result from a finished query. + * If successful the cache entry will be set to valid status and be usable for cached queries. * Important: Adding the entry to the cache will increment the reader count for the cache entry. * CacheEntry.releaseReader() should be called when the caller is done with the cache entry. - * - * @param queryInfo + * @param cacheEntry * @param fetchWork - * @return The entry if added to the cache. null if the entry is not added. + * @return */ - public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) { - - CacheEntry addedEntry = null; + public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { + String queryText = cacheEntry.getQueryText(); boolean dataDirMoved = false; Path queryResultsPath = null; Path cachedResultsPath = null; - String queryText = queryInfo.getLookupInfo().getQueryText(); - // Should we remove other candidate entries if they are equivalent to these query results? try { - CacheEntry potentialEntry = new CacheEntry(); - potentialEntry.queryInfo = queryInfo; + boolean requiresMove = true; queryResultsPath = fetchWork.getTblDir(); FileSystem resultsFs = queryResultsPath.getFileSystem(conf); - ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); - potentialEntry.size = cs.getLength(); - - Lock writeLock = rwLock.writeLock(); - try { - writeLock.lock(); - - if (!shouldEntryBeAdded(potentialEntry)) { - return null; - } - if (!clearSpaceForCacheEntry(potentialEntry)) { - return null; - } - - LOG.info("Adding cache entry for query '{}'", queryText); - - // Add the entry to the cache structures while under write lock. Do not mark the entry - // as valid yet, since the query results have not yet been moved to the cache directory. - // Do the data move after unlocking since it might take time. - // Mark the entry as valid once the data has been moved to the cache directory. - Set entriesForQuery = queryMap.get(queryText); - if (entriesForQuery == null) { - entriesForQuery = new HashSet(); - queryMap.put(queryText, entriesForQuery); - } - entriesForQuery.add(potentialEntry); - lru.put(potentialEntry, potentialEntry); - cacheSize += potentialEntry.size; - addedEntry = potentialEntry; + long resultSize; + if (resultsFs.exists(queryResultsPath)) { + ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); + resultSize = cs.getLength(); + } else { + // No actual result directory, no need to move anything. + cachedResultsPath = zeroRowsPath; + resultSize = 0; + requiresMove = false; + } - } finally { - writeLock.unlock(); + if (!shouldEntryBeAdded(cacheEntry, resultSize)) { + return false; } - // Move the query results to the query cache directory. - cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); - dataDirMoved = true; + if (requiresMove) { + // Move the query results to the query cache directory. + cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); + dataDirMoved = true; + } LOG.info("Moved query results from {} to {} (size {}) for query '{}'", - queryResultsPath, cachedResultsPath, cs.getLength(), queryText); + queryResultsPath, cachedResultsPath, resultSize, queryText); // Create a new FetchWork to reference the new cache location. FetchWork fetchWorkForCache = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); fetchWorkForCache.setCachedResult(true); - addedEntry.fetchWork = fetchWorkForCache; - addedEntry.cachedResultsPath = cachedResultsPath; - addedEntry.createTime = System.currentTimeMillis(); - addedEntry.setValidity(true); + cacheEntry.fetchWork = fetchWorkForCache; + cacheEntry.cachedResultsPath = cachedResultsPath; + cacheEntry.size = resultSize; + this.cacheSize += resultSize; + cacheEntry.createTime = System.currentTimeMillis(); + cacheEntry.setStatus(CacheEntryStatus.VALID); // Mark this entry as being in use. Caller will need to release later. - addedEntry.addReader(); + cacheEntry.addReader(); - scheduleEntryInvalidation(addedEntry); + scheduleEntryInvalidation(cacheEntry); + + // Notify any queries waiting on this cacheEntry to become valid. + synchronized (cacheEntry) { + cacheEntry.notifyAll(); + } } catch (Exception err) { LOG.error("Failed to create cache entry for query results for query: " + queryText, err); - if (addedEntry != null) { - // If the entry was already added to the cache when we hit error, clean up properly. - - if (dataDirMoved) { - // If data was moved from original location to cache directory, we need to move it back! - LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath); - try { - FileSystem fs = cachedResultsPath.getFileSystem(conf); - fs.rename(cachedResultsPath, queryResultsPath); - addedEntry.cachedResultsPath = null; - } catch (Exception err2) { - String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText; - LOG.error(errMsg); - throw new RuntimeException(errMsg); - } - } - - addedEntry.invalidate(); - if (addedEntry.numReaders() > 0) { - addedEntry.releaseReader(); + if (dataDirMoved) { + // If data was moved from original location to cache directory, we need to move it back! + LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath); + try { + FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.rename(cachedResultsPath, queryResultsPath); + cacheEntry.size = 0; + cacheEntry.cachedResultsPath = null; + } catch (Exception err2) { + String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText; + LOG.error(errMsg); + throw new RuntimeException(errMsg); } } - return null; + // Invalidate the entry. Rely on query cleanup to remove from lookup. + cacheEntry.invalidate(); + return false; } - return addedEntry; + return true; } public void clear() { @@ -524,7 +573,7 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) { return true; } - private void removeEntry(CacheEntry entry) { + public void removeEntry(CacheEntry entry) { entry.invalidate(); removeFromLookup(entry); lru.remove(entry); @@ -535,9 +584,14 @@ private void removeEntry(CacheEntry entry) { private void removeFromLookup(CacheEntry entry) { String queryString = entry.getQueryText(); Set entries = queryMap.get(queryString); - Preconditions.checkState(entries != null); + if (entries == null) { + LOG.warn("ResultsCache: no entry for {}", queryString); + return; + } boolean deleted = entries.remove(entry); - Preconditions.checkState(deleted); + if (!deleted) { + LOG.warn("ResultsCache: Attempted to remove entry but it was not in the cache: {}", entry); + } if (entries.isEmpty()) { queryMap.remove(queryString); } @@ -553,10 +607,14 @@ private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IO /** * Determines if the cache entry should be added to the results cache. */ - private boolean shouldEntryBeAdded(CacheEntry entry) { + private boolean shouldEntryBeAdded(CacheEntry entry, long size) { // Assumes the cache lock has already been taken. - if (maxEntrySize >= 0 && entry.size > maxEntrySize) { - LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize); + if (maxEntrySize >= 0 && size > maxEntrySize) { + LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize); + return false; + } + + if (!clearSpaceForCacheEntry(entry, size)) { return false; } @@ -571,41 +629,41 @@ private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOExcepti return cachedResultsPath; } - private boolean hasSpaceForCacheEntry(CacheEntry entry) { + private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) { if (maxCacheSize >= 0) { - return (cacheSize + entry.size) <= maxCacheSize; + return (cacheSize + size) <= maxCacheSize; } // Negative max cache size means unbounded. return true; } - private boolean clearSpaceForCacheEntry(CacheEntry entry) { - if (hasSpaceForCacheEntry(entry)) { + private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) { + if (hasSpaceForCacheEntry(entry, size)) { return true; } LOG.info("Clearing space for cache entry for query: [{}] with size {}", - entry.getQueryText(), entry.size); + entry.getQueryText(), size); // Entries should be in LRU order in the keyset iterator. CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); for (CacheEntry removalCandidate : entries) { - if (!removalCandidate.isValid()) { - // Likely an entry which is still getting its results moved to the cache directory. + if (removalCandidate.getStatus() != CacheEntryStatus.VALID) { + // Only entries marked as valid should have results that can be removed. continue; } // Only delete the entry if it has no readers. if (!(removalCandidate.numReaders() > 0)) { LOG.info("Removing entry: {}", removalCandidate); removeEntry(removalCandidate); - if (hasSpaceForCacheEntry(entry)) { + if (hasSpaceForCacheEntry(entry, size)) { return true; } } } LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}", - entry.getQueryText(), entry.size); + entry.getQueryText(), size); return false; } @@ -633,11 +691,11 @@ public static void cleanupInstance() { private void scheduleEntryInvalidation(final CacheEntry entry) { if (maxEntryLifetime >= 0) { - // Schedule task to invalidate cache entry. + // Schedule task to invalidate cache entry and remove from lookup. ScheduledFuture future = invalidationExecutor.schedule(new Runnable() { @Override public void run() { - entry.invalidate(); + removeEntry(entry); } }, maxEntryLifetime, TimeUnit.MILLISECONDS); entry.invalidationFuture = future; @@ -645,9 +703,11 @@ public void run() { } private static void cleanupEntry(final CacheEntry entry) { - Preconditions.checkState(!entry.isValid()); + Preconditions.checkState(entry.getStatus() == CacheEntryStatus.INVALID); + final HiveConf conf = getInstance().conf; - if (entry.cachedResultsPath != null) { + if (entry.cachedResultsPath != null && + !getInstance().zeroRowsPath.equals(entry.cachedResultsPath)) { deletionExecutor.execute(new Runnable() { @Override public void run() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d1609e1..e417f4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -14280,14 +14280,38 @@ private boolean checkResultsCache(QueryResultsCache.LookupInfo lookupInfo) { } // Don't increment the reader count for explain queries. boolean isExplainQuery = (ctx.getExplainConfig() != null); - QueryResultsCache.CacheEntry cacheEntry = - QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery); + QueryResultsCache.CacheEntry cacheEntry = QueryResultsCache.getInstance().lookup(lookupInfo); if (cacheEntry != null) { - // Use the cache rather than full query execution. - useCachedResult(cacheEntry); + // Potentially wait on the cache entry if entry is in PENDING status + // Blocking here can potentially be dangerous - for example if the global compile lock + // is used this will block all subsequent queries that try to acquire the compile lock, + // so it should not be done unless parallel compilation is enabled. + // We might not want to block for explain queries as well. + if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.PENDING) { + if (!isExplainQuery && + conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS) && + conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION)) { + if (!cacheEntry.waitForValidStatus()) { + LOG.info("Waiting on pending cacheEntry, but it failed to become valid"); + return false; + } + } else { + LOG.info("Not waiting for pending cacheEntry"); + return false; + } + } - // At this point the caller should return from semantic analysis. - return true; + if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.VALID) { + if (!isExplainQuery) { + if (!cacheEntry.addReader()) { + return false; + } + } + // Use the cache rather than full query execution. + // At this point the caller should return from semantic analysis. + useCachedResult(cacheEntry); + return true; + } } return false; } diff --git a/ql/src/test/queries/clientpositive/results_cache_empty_result.q b/ql/src/test/queries/clientpositive/results_cache_empty_result.q new file mode 100644 index 0000000..6213671 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_empty_result.q @@ -0,0 +1,13 @@ + +set hive.query.results.cache.enabled=true; + +explain +select count(*), key from src a where key < 0 group by key; +select count(*), key from src a where key < 0 group by key; + +set test.comment="Cache should be used for this query"; +set test.comment; +explain +select count(*), key from src a where key < 0 group by key; +select count(*), key from src a where key < 0 group by key; + diff --git a/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out new file mode 100644 index 0000000..b63d079 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out @@ -0,0 +1,99 @@ +PREHOOK: query: explain +select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (UDFToDouble(key) < 0.0D) (type: boolean) + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: bigint), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +test.comment="Cache should be used for this query" +PREHOOK: query: explain +select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +POSTHOOK: query: select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src diff --git a/ql/src/test/results/clientpositive/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out new file mode 100644 index 0000000..000e5f0 --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out @@ -0,0 +1,91 @@ +PREHOOK: query: explain +select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) < 0.0D) (type: boolean) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: bigint), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +test.comment="Cache should be used for this query" +PREHOOK: query: explain +select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*), key from src a where key < 0 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*), key from src a where key < 0 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here ####