diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index ac5ae573d6..b1a3646624 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -502,32 +502,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } - if (requiresMove) { - // Move the query results to the query cache directory. - cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); - dataDirMoved = true; - } - LOG.info("Moved query results from {} to {} (size {}) for query '{}'", - queryResultsPath, cachedResultsPath, resultSize, queryText); - - // Create a new FetchWork to reference the new cache location. - FetchWork fetchWorkForCache = - new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); - fetchWorkForCache.setCachedResult(true); - cacheEntry.fetchWork = fetchWorkForCache; - cacheEntry.cachedResultsPath = cachedResultsPath; - cacheEntry.size = resultSize; - this.cacheSize += resultSize; - cacheEntry.createTime = System.currentTimeMillis(); - - cacheEntry.setStatus(CacheEntryStatus.VALID); - // Mark this entry as being in use. Caller will need to release later. - cacheEntry.addReader(); - - scheduleEntryInvalidation(cacheEntry); - - // Notify any queries waiting on this cacheEntry to become valid. + // Synchronize on the cache entry so that no one else can invalidate this entry + // while we are in the process of setting it to valid. synchronized (cacheEntry) { + if (cacheEntry.getStatus() == CacheEntryStatus.INVALID) { + // Entry either expired, or was invalidated due to table updates + return false; + } + + if (requiresMove) { + // Move the query results to the query cache directory. + cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); + dataDirMoved = true; + } + LOG.info("Moved query results from {} to {} (size {}) for query '{}'", + queryResultsPath, cachedResultsPath, resultSize, queryText); + + // Create a new FetchWork to reference the new cache location. + FetchWork fetchWorkForCache = + new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + fetchWorkForCache.setCachedResult(true); + cacheEntry.fetchWork = fetchWorkForCache; + cacheEntry.cachedResultsPath = cachedResultsPath; + cacheEntry.size = resultSize; + this.cacheSize += resultSize; + cacheEntry.createTime = System.currentTimeMillis(); + + cacheEntry.setStatus(CacheEntryStatus.VALID); + // Mark this entry as being in use. Caller will need to release later. + cacheEntry.addReader(); + + scheduleEntryInvalidation(cacheEntry); + + // Notify any queries waiting on this cacheEntry to become valid. cacheEntry.notifyAll(); } @@ -564,7 +571,11 @@ public void clear() { try { writeLock.lock(); LOG.info("Clearing the results cache"); - for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) { + CacheEntry[] allEntries = null; + synchronized (lru) { + allEntries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); + } + for (CacheEntry entry : allEntries) { try { removeEntry(entry); } catch (Exception err) { @@ -611,10 +622,15 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) { public void removeEntry(CacheEntry entry) { entry.invalidate(); - removeFromLookup(entry); - lru.remove(entry); - // Should the cache size be updated here, or after the result data has actually been deleted? - cacheSize -= entry.size; + rwLock.writeLock().lock(); + try { + removeFromLookup(entry); + lru.remove(entry); + // Should the cache size be updated here, or after the result data has actually been deleted? + cacheSize -= entry.size; + } finally { + rwLock.writeLock().unlock(); + } } private void removeFromLookup(CacheEntry entry) { @@ -674,6 +690,20 @@ private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) { return true; } + private CacheEntry findEntryToRemove() { + // Entries should be in LRU order in the keyset iterator. + Set entries = lru.keySet(); + synchronized (lru) { + for (CacheEntry removalCandidate : entries) { + if (removalCandidate.getStatus() != CacheEntryStatus.VALID) { + continue; + } + return removalCandidate; + } + } + return null; + } + private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) { if (hasSpaceForCacheEntry(entry, size)) { return true; @@ -682,20 +712,14 @@ private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) { LOG.info("Clearing space for cache entry for query: [{}] with size {}", entry.getQueryText(), size); - // Entries should be in LRU order in the keyset iterator. - CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); - for (CacheEntry removalCandidate : entries) { - if (removalCandidate.getStatus() != CacheEntryStatus.VALID) { - // Only entries marked as valid should have results that can be removed. - continue; - } - // Only delete the entry if it has no readers. - if (!(removalCandidate.numReaders() > 0)) { - LOG.info("Removing entry: {}", removalCandidate); - removeEntry(removalCandidate); - if (hasSpaceForCacheEntry(entry, size)) { - return true; - } + CacheEntry removalCandidate; + while ((removalCandidate = findEntryToRemove()) != null) { + LOG.info("Removing entry: {}", removalCandidate); + removeEntry(removalCandidate); + // TODO: Should we wait for the entry to actually be deleted from HDFS? Would have to + // poll the reader count, waiting for it to reach 0, at which point cleanup should occur. + if (hasSpaceForCacheEntry(entry, size)) { + return true; } }