diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 05c2acd..8d9d186 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3684,6 +3684,29 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), + HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, + "If the query results cache is enabled. This will keep results of previously executed queries " + + "to be reused if the same query is executed again."), + + HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory", + "/tmp/hive/_resultscache_", + "Location of the query results cache directory. Temporary results from queries will be moved to this location."), + + HIVE_QUERY_RESULTS_CACHE_DIR_PERMISSION("hive.query.results.cache.dir.permission", "700", + "The permission for the query results cache directory that gets created."), + + HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME("hive.query.results.cache.max.entry.lifetime", "900s", + new TimeValidator(TimeUnit.SECONDS), + "Maximum lifetime for an entry in the query results cache. A nonpositive value means infinite."), + + HIVE_QUERY_RESULTS_CACHE_MAX_SIZE("hive.query.results.cache.max.size", + (long) 32 * 1024 * 1024, + "Maximum total of the query results cache in bytes."), + + HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE("hive.query.results.cache.max.entry.size", + (long) 1 * 1024 * 1024, + "Maximum size in bytes of an entry in the query results cache"), + /* BLOBSTORE section */ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 2767bca..764a832 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -71,6 +71,7 @@ public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators"; public static final String LOAD_HASHTABLE = "LoadHashtable"; public static final String TEZ_GET_SESSION = "TezGetSession"; + public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache"; public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning"; public static final String SPARK_BUILD_PLAN = "SparkBuildPlan"; diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1017249..f641b48 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -251,6 +251,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ ptf.q,\ ptf_matchpath.q,\ ptf_streaming.q,\ + results_cache_1.q,\ sample1.q,\ selectDistinctStar.q,\ select_dummy_source.q,\ diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 3f377f9..3f1f227 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -100,6 +100,7 @@ import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -993,6 +994,9 @@ public void clearTestSideEffects() throws Exception { return; } + // Remove any cached results from the previous test. + QueryResultsCache.cleanupInstance(); + // allocate and initialize a new conf since a test can // modify conf by using 'set' commands conf = new HiveConf(IDriver.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 74595b0..274aaad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -55,6 +55,9 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -189,6 +192,9 @@ // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private CacheUsage cacheUsage; + private CacheEntry usedCacheEntry; + private enum DriverState { INITIALIZED, COMPILING, @@ -640,6 +646,11 @@ public void run() { } LOG.info("Semantic Analysis Completed"); + // Retrieve information about cache usage for the query. + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + cacheUsage = sem.getCacheUsage(); + } + // validate the plan sem.validate(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); @@ -1794,6 +1805,52 @@ private CommandProcessorResponse createProcessorResponse(int ret) { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); } + private void useFetchFromCache(CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTask = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf); + fetchTask.initialize(queryState, plan, null, ctx.getOpContext()); + plan.setFetchTask(fetchTask); + cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry); + } + + private void checkCacheUsage() throws Exception { + if (cacheUsage != null) { + if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { + // Using a previously cached result. + CacheEntry cacheEntry = cacheUsage.getCacheEntry(); + if (cacheEntry.addReader()) { + usedCacheEntry = cacheEntry; + } else { + String msg = "Cached result is no longer valid for query: " + cacheEntry.toString(); + LOG.info(msg); + + // Need to handle properly - the query needs to be recompiled, and this (now invalid) + // cache result will not be chosen next time. + // TODO: Use the query retry mechanism that will be done with HIVE-17626 + throw new CommandNeedRetryException(msg); + } + } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + plan.getFetchTask() != null) { + // The query could not be resolved using the cache, but the query results + // can be added to the cache for future queries to use. + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + + CacheEntry savedCacheEntry = + QueryResultsCache.getInstance().addToCache( + cacheUsage.getQueryInfo(), + plan.getFetchTask().getWork()); + if (savedCacheEntry != null) { + useFetchFromCache(savedCacheEntry); + // addToCache() already increments the reader count. Set usedCacheEntry so it gets released. + usedCacheEntry = savedCacheEntry; + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + } + } + } + private void execute() throws CommandNeedRetryException, CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); @@ -2015,6 +2072,8 @@ private void execute() throws CommandNeedRetryException, CommandProcessorRespons } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); + checkCacheUsage(); + // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -2435,12 +2494,23 @@ private void releaseFetchTask() { LOG.debug(" Exception while clearing the FetchTask ", e); } } + + private void releaseCachedResult() { + // Assumes the reader count has been incremented automatically by the results cache by either + // lookup or creating the cache entry. + if (usedCacheEntry != null) { + usedCacheEntry.releaseReader(); + usedCacheEntry = null; + } + } + // Close and release resources within a running query process. Since it runs under // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition // with the releases probably running in the other closing thread. private int closeInProcess(boolean destroyed) { releaseDriverContext(); releasePlan(); + releaseCachedResult(); releaseFetchTask(); releaseResStream(); releaseContext(); @@ -2470,6 +2540,7 @@ public int close() { return 0; } releasePlan(); + releaseCachedResult(); releaseFetchTask(); releaseResStream(); releaseContext(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java new file mode 100644 index 0000000..a84936a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cache.results; + +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.QueryInfo; + +/** + * Helper class during semantic analysis that indicates if the query can use the cache, + * or if the results from the query can be added to the results cache. + */ +public class CacheUsage { + + public enum CacheStatus { + CACHE_NOT_USED, + QUERY_USING_CACHE, + CAN_CACHE_QUERY_RESULTS, + }; + + private CacheUsage.CacheStatus status; + private CacheEntry cacheEntry; + private QueryInfo queryInfo; + + public CacheUsage(CacheStatus status, CacheEntry cacheEntry) { + this.status = status; + this.cacheEntry = cacheEntry; + } + + public CacheUsage(CacheStatus status, QueryInfo queryInfo) { + this.status = status; + this.queryInfo = queryInfo; + } + + public CacheUsage.CacheStatus getStatus() { + return status; + } + + public void setStatus(CacheUsage.CacheStatus status) { + this.status = status; + } + + public CacheEntry getCacheEntry() { + return cacheEntry; + } + + public void setCacheEntry(CacheEntry cacheEntry) { + this.cacheEntry = cacheEntry; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public void setQueryInfo(QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java new file mode 100644 index 0000000..520e857 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -0,0 +1,667 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cache.results; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.Entity.Type; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; +import org.apache.hadoop.hive.ql.parse.TableAccessInfo; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.HiveOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class to handle management and lookup of cached Hive query results + */ +public class QueryResultsCache { + + private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class); + + public static class LookupInfo { + private String queryText; + + public LookupInfo(String queryText) { + super(); + this.queryText = queryText; + } + + public String getQueryText() { + return queryText; + } + } + + public static class QueryInfo { + private LookupInfo lookupInfo; + private HiveOperation hiveOperation; + private List resultSchema; + private TableAccessInfo tableAccessInfo; + private ColumnAccessInfo columnAccessInfo; + private HashSet inputs; + + public QueryInfo( + LookupInfo lookupInfo, + HiveOperation hiveOperation, + List resultSchema, + TableAccessInfo tableAccessInfo, + ColumnAccessInfo columnAccessInfo, + HashSet inputs) { + this.lookupInfo = lookupInfo; + this.hiveOperation = hiveOperation; + this.resultSchema = resultSchema; + this.tableAccessInfo = tableAccessInfo; + this.columnAccessInfo = columnAccessInfo; + this.inputs = inputs; + } + + public LookupInfo getLookupInfo() { + return lookupInfo; + } + + public void setLookupInfo(LookupInfo lookupInfo) { + this.lookupInfo = lookupInfo; + } + + public HiveOperation getHiveOperation() { + return hiveOperation; + } + + public void setHiveOperation(HiveOperation hiveOperation) { + this.hiveOperation = hiveOperation; + } + + public List getResultSchema() { + return resultSchema; + } + + public void setResultSchema(List resultSchema) { + this.resultSchema = resultSchema; + } + + public TableAccessInfo getTableAccessInfo() { + return tableAccessInfo; + } + + public void setTableAccessInfo(TableAccessInfo tableAccessInfo) { + this.tableAccessInfo = tableAccessInfo; + } + + public ColumnAccessInfo getColumnAccessInfo() { + return columnAccessInfo; + } + + public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) { + this.columnAccessInfo = columnAccessInfo; + } + + public HashSet getInputs() { + return inputs; + } + + public void setInputs(HashSet inputs) { + this.inputs = inputs; + } + } + + public static class CacheEntry { + private QueryInfo queryInfo; + private FetchWork fetchWork; + private Path cachedResultsPath; + + // Cache administration + private long createTime; + private long size; + private AtomicBoolean valid = new AtomicBoolean(false); + private AtomicInteger readers = new AtomicInteger(0); + private ScheduledFuture invalidationFuture = null; + + public boolean isValid() { + return valid.get(); + } + + public void releaseReader() { + int readerCount = 0; + synchronized (this) { + readerCount = readers.decrementAndGet(); + } + LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount); + Preconditions.checkState(readerCount >= 0); + + cleanupIfNeeded(); + } + + public String toString() { + return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText() + + "], location: " + cachedResultsPath + + ", size: " + size; + } + + public boolean addReader() { + boolean added = false; + int readerCount = 0; + synchronized (this) { + if (valid.get()) { + readerCount = readers.incrementAndGet(); + added = true; + } + } + Preconditions.checkState(readerCount > 0); + LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount); + return added; + } + + private int numReaders() { + return readers.get(); + } + + private void invalidate() { + boolean wasValid = setValidity(false); + + if (wasValid) { + LOG.info("Invalidated cache entry: {}", this); + + if (invalidationFuture != null) { + // The cache entry has just been invalidated, no need for the scheduled invalidation. + invalidationFuture.cancel(false); + } + cleanupIfNeeded(); + } + } + + /** + * Set the validity, returning the previous validity value. + * @param valid + * @return + */ + private boolean setValidity(boolean valid) { + synchronized(this) { + return this.valid.getAndSet(valid); + } + } + + private void cleanupIfNeeded() { + if (!isValid() && readers.get() <= 0) { + QueryResultsCache.cleanupEntry(this); + } + } + + private String getQueryText() { + return getQueryInfo().getLookupInfo().getQueryText(); + } + + public FetchWork getFetchWork() { + return fetchWork; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public Path getCachedResultsPath() { + return cachedResultsPath; + } + } + + // Allow lookup by query string + private final Map> queryMap = new HashMap>(); + + // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node. + // Use synchronized map since even read actions cause the lru to get updated. + private final Map lru = Collections.synchronizedMap( + new LinkedHashMap(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true)); + + private final HiveConf conf; + private Path cacheDirPath; + private long cacheSize = 0; + private long maxCacheSize; + private long maxEntrySize; + private long maxEntryLifetime; + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + private QueryResultsCache(HiveConf configuration) throws IOException { + this.conf = configuration; + + // Set up cache directory + Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY)); + LOG.info("Initializing query results cache at {}", rootCacheDir); + Utilities.ensurePathIsWritable(rootCacheDir, conf); + + String currentCacheDirName = "results-" + UUID.randomUUID().toString(); + cacheDirPath = new Path(rootCacheDir, currentCacheDirName); + FileSystem fs = cacheDirPath.getFileSystem(conf); + String permString = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIR_PERMISSION); + FsPermission fsPermission = new FsPermission(permString); + fs.mkdirs(cacheDirPath, fsPermission); + + // Results cache directory should be cleaned up at process termination. + fs.deleteOnExit(cacheDirPath); + + maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE); + maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE); + maxEntryLifetime = conf.getTimeVar( + HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME, + TimeUnit.MILLISECONDS); + + LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}", + cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime); + } + + private static final AtomicBoolean inited = new AtomicBoolean(false); + private static QueryResultsCache instance; + + public static void initialize(HiveConf conf) throws IOException { + if (!inited.getAndSet(true)) { + try { + instance = new QueryResultsCache(conf); + } catch (IOException err) { + inited.set(false); + throw err; + } + } + } + + public static QueryResultsCache getInstance() { + return instance; + } + + /** + * Check if the cache contains an entry for the requested CacheInfo. + * If a cache entry is returned, the caller will need to call addReader() to increment + * the reader count for the cache entry. + * @param request + * @return The cached result if there is a match in the cache, or null if no match is found. + */ + public CacheEntry lookup(LookupInfo request) { + CacheEntry result = null; + + LOG.debug("QueryResultsCache lookup for query: {}", request.queryText); + + Lock readLock = rwLock.readLock(); + try { + readLock.lock(); + Set candidates = queryMap.get(request.queryText); + if (candidates != null) { + for (CacheEntry candidate : candidates) { + if (entryMatches(request, candidate)) { + result = candidate; + break; + } + } + + if (result != null) { + lru.get(result); // Update LRU + + if (!result.isValid()) { + // Entry is in the cache, but not valid. + // This can happen when the entry is first added, before the data has been moved + // to the results cache directory. We cannot use this entry yet. + result = null; + } + + // One option is to call addReader() now during lookup, + // to ensure the caller is able to use the query results. + } + } + } finally { + readLock.unlock(); + } + + LOG.debug("QueryResultsCache lookup result: {}", result); + + return result; + } + + /** + * Add an entry to the query results cache. + * Important: Adding the entry to the cache will increment the reader count for the cache entry. + * CacheEntry.releaseReader() should be called when the caller is done with the cache entry. + * + * @param queryInfo + * @param fetchWork + * @return The entry if added to the cache. null if the entry is not added. + */ + public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) { + + CacheEntry addedEntry = null; + boolean dataDirMoved = false; + Path queryResultsPath = null; + Path cachedResultsPath = null; + String queryText = queryInfo.getLookupInfo().getQueryText(); + + // TODO: should we remove other candidate entries if they are equivalent to these query results? + try { + CacheEntry potentialEntry = new CacheEntry(); + potentialEntry.queryInfo = queryInfo; + queryResultsPath = fetchWork.getTblDir(); + FileSystem resultsFs = queryResultsPath.getFileSystem(conf); + ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); + potentialEntry.size = cs.getLength(); + + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + + if (!shouldEntryBeAdded(potentialEntry)) { + return null; + } + if (!clearSpaceForCacheEntry(potentialEntry)) { + return null; + } + + LOG.info("Adding cache entry for query '{}'", queryText); + + // Add the entry to the cache structures while under write lock. Do not mark the entry + // as valid yet, since the query results have not yet been moved to the cache directory. + // Do the data move after unlocking since it might take time. + // Mark the entry as valid once the data has been moved to the cache directory. + Set entriesForQuery = queryMap.get(queryText); + if (entriesForQuery == null) { + entriesForQuery = new HashSet(); + queryMap.put(queryText, entriesForQuery); + } + entriesForQuery.add(potentialEntry); + lru.put(potentialEntry, potentialEntry); + cacheSize += potentialEntry.size; + addedEntry = potentialEntry; + + } finally { + writeLock.unlock(); + } + + // Move the query results to the query cache directory. + cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); + dataDirMoved = true; + LOG.info("Moved query results from {} to {} (size {}) for query '{}'", + queryResultsPath, cachedResultsPath, cs.getLength(), queryText); + + // Create a new FetchWork to reference the new cache location. + FetchWork fetchWorkForCache = + new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + fetchWorkForCache.setCachedResult(true); + addedEntry.fetchWork = fetchWorkForCache; + addedEntry.cachedResultsPath = cachedResultsPath; + addedEntry.createTime = System.currentTimeMillis(); + addedEntry.setValidity(true); + + // Mark this entry as being in use. Caller will need to release later. + addedEntry.addReader(); + + scheduleEntryInvalidation(addedEntry); + } catch (Exception err) { + LOG.error("Failed to create cache entry for query results for query: " + queryText, err); + + if (addedEntry != null) { + // If the entry was already added to the cache when we hit error, clean up properly. + + if (dataDirMoved) { + // If data was moved from original location to cache directory, we need to move it back! + LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath); + try { + FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.rename(cachedResultsPath, queryResultsPath); + addedEntry.cachedResultsPath = null; + } catch (Exception err2) { + String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText; + LOG.error(errMsg); + throw new RuntimeException(errMsg); + } + } + + addedEntry.invalidate(); + if (addedEntry.numReaders() > 0) { + addedEntry.releaseReader(); + } + } + + return null; + } + + return addedEntry; + } + + public void clear() { + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + LOG.info("Clearing the results cache"); + for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) { + try { + removeEntry(entry); + } catch (Exception err) { + LOG.error("Error removing cache entry " + entry, err); + } + } + } finally { + writeLock.unlock(); + } + } + + public long getSize() { + Lock readLock = rwLock.readLock(); + try { + readLock.lock(); + return cacheSize; + } finally { + readLock.unlock(); + } + } + + private static final int INITIAL_LRU_SIZE = 16; + private static final float LRU_LOAD_FACTOR = 0.75f; + private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {}; + + private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) { + QueryInfo queryInfo = entry.getQueryInfo(); + for (ReadEntity readEntity : queryInfo.getInputs()) { + // Check that the tables used do not resolve to temp tables. + if (readEntity.getType() == Type.TABLE) { + Table tableUsed = readEntity.getTable(); + Map tempTables = + SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName()); + if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) { + LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.", + tableUsed.getTableName()); + return false; + } + } + } + + return true; + } + + private void removeEntry(CacheEntry entry) { + entry.invalidate(); + removeFromLookup(entry); + lru.remove(entry); + // Should the cache size be updated here, or after the result data has actually been deleted? + cacheSize -= entry.size; + } + + private void removeFromLookup(CacheEntry entry) { + String queryString = entry.getQueryText(); + Set entries = queryMap.get(queryString); + Preconditions.checkState(entries != null); + boolean deleted = entries.remove(entry); + Preconditions.checkState(deleted); + if (entries.isEmpty()) { + queryMap.remove(queryString); + } + } + + private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException { + Path queryResultsPath = fetchWork.getTblDir(); + FileSystem resultsFs = queryResultsPath.getFileSystem(conf); + ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); + entry.size = cs.getLength(); + } + + /** + * Determines if the cache entry should be added to the results cache. + */ + private boolean shouldEntryBeAdded(CacheEntry entry) { + // Assumes the cache lock has already been taken. + if (maxEntrySize >= 0 && entry.size > maxEntrySize) { + LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize); + return false; + } + + return true; + } + + private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException { + String dirName = UUID.randomUUID().toString(); + Path cachedResultsPath = new Path(cacheDirPath, dirName); + FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.rename(queryResultsPath, cachedResultsPath); + return cachedResultsPath; + } + + private boolean hasSpaceForCacheEntry(CacheEntry entry) { + if (maxCacheSize >= 0) { + return (cacheSize + entry.size) <= maxCacheSize; + } + // Negative max cache size means unbounded. + return true; + } + + private boolean clearSpaceForCacheEntry(CacheEntry entry) { + if (hasSpaceForCacheEntry(entry)) { + return true; + } + + LOG.info("Clearing space for cache entry for query: [{}] with size {}", + entry.getQueryText(), entry.size); + + // Entries should be in LRU order in the keyset iterator. + CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); + for (CacheEntry removalCandidate : entries) { + if (!removalCandidate.isValid()) { + // Likely an entry which is still getting its results moved to the cache directory. + continue; + } + // Only delete the entry if it has no readers. + if (!(removalCandidate.numReaders() > 0)) { + LOG.info("Removing entry: {}", removalCandidate); + removeEntry(removalCandidate); + if (hasSpaceForCacheEntry(entry)) { + return true; + } + } + } + + LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}", + entry.getQueryText(), entry.size); + return false; + } + + + @VisibleForTesting + public static void cleanupInstance() { + // This should only ever be called in testing scenarios. + // There should not be any other users of the cache or its entries or this may mess up cleanup. + if (inited.get()) { + getInstance().clear(); + instance = null; + inited.set(false); + } + } + + private static ScheduledExecutorService invalidationExecutor = null; + private static ExecutorService deletionExecutor = null; + + static { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build(); + invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); + deletionExecutor = Executors.newSingleThreadExecutor(threadFactory); + } + + private void scheduleEntryInvalidation(final CacheEntry entry) { + if (maxEntryLifetime >= 0) { + // Schedule task to invalidate cache entry. + ScheduledFuture future = invalidationExecutor.schedule(new Runnable() { + @Override + public void run() { + entry.invalidate(); + } + }, maxEntryLifetime, TimeUnit.MILLISECONDS); + entry.invalidationFuture = future; + } + } + + private static void cleanupEntry(final CacheEntry entry) { + Preconditions.checkState(!entry.isValid()); + + if (entry.cachedResultsPath != null) { + deletionExecutor.execute(new Runnable() { + @Override + public void run() { + Path path = entry.cachedResultsPath; + LOG.info("Cache directory cleanup: deleting {}", path); + try { + FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf); + fs.delete(entry.cachedResultsPath, true); + } catch (Exception err) { + LOG.error("Error while trying to delete " + path, err); + } + } + }); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2e1fd37..0c6b369 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2626,6 +2626,10 @@ public static boolean isEmptyPath(Configuration job, Path dirPath) throws IOExce return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class)); } + public static int getNumClusterJobs(List> tasks) { + return getMRTasks(tasks).size() + getTezTasks(tasks).size() + getSparkTasks(tasks).size(); + } + static class TaskFilterFunction implements DAGTraversal.Function { private Set> visited = new HashSet<>(); private Class requiredType; @@ -4442,4 +4446,34 @@ public static boolean isHiveManagedFile(Path path) { return AcidUtils.ORIGINAL_PATTERN.matcher(path.getName()).matches() || AcidUtils.ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches(); } + + /** + * Checks if path passed in exists and has writable permissions. + * The path will be created if it does not exist. + * @param rootHDFSDirPath + * @param conf + */ + public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException { + FsPermission writableHDFSDirPermission = new FsPermission((short)00733); + FileSystem fs = rootHDFSDirPath.getFileSystem(conf); + if (!fs.exists(rootHDFSDirPath)) { + Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); + } + FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); + if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { + String schema = rootHDFSDirPath.toUri().getScheme(); + LOG.debug( + "HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + + currentHDFSDirPermission); + } else { + LOG.debug( + "HDFS dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); + } + // If the root HDFS scratch dir already exists, make sure it is writeable. + if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission + .toShort()) == writableHDFSDirPermission.toShort())) { + throw new RuntimeException("The dir: " + rootHDFSDirPath + + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index f0dd167..1f8a48c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -967,6 +967,44 @@ public static AggregateCall createSingleArgAggCall(String funcName, RelOptCluste return AggregateCall.create(aggFunction, false, argList, -1, aggFnRetType, null); } + /** + * Is the expression usable for query materialization. + */ + public static boolean isMaterializable(RexNode expr) { + return (checkMaterializable(expr) == null); + } + + /** + * Check if the expression is usable for query materialization, returning the first failing expression. + */ + public static RexCall checkMaterializable(RexNode expr) { + boolean deterministic = true; + RexCall failingCall = null; + + if (expr == null) { + return null; + } + + RexVisitor visitor = new RexVisitorImpl(true) { + @Override + public Void visitCall(org.apache.calcite.rex.RexCall call) { + // non-deterministic functions as well as runtime constants are not materializable. + if (!call.getOperator().isDeterministic() || call.getOperator().isDynamicFunction()) { + throw new Util.FoundOne(call); + } + return super.visitCall(call); + } + }; + + try { + expr.accept(visitor); + } catch (Util.FoundOne e) { + failingCall = (RexCall) e.getNode(); + } + + return failingCall; + } + public static HiveTableFunctionScan createUDTFForSetOp(RelOptCluster cluster, RelNode input) throws SemanticException { RelTraitSet traitSet = TraitsUtil.getDefaultTraitSet(cluster); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java new file mode 100644 index 0000000..ca96b64 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import java.util.List; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; + +import org.apache.hadoop.hive.metastore.TableType; + +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Checks the query plan for conditions that would make the plan unsuitable for + * materialized views or query caching: + * - References to temporary or external tables + * - References to non-determinisitc functions. + */ +public class HiveRelOpMaterializationValidator extends HiveRelShuttleImpl { + static final Logger LOG = LoggerFactory.getLogger(HiveRelOpMaterializationValidator.class); + + protected String invalidMaterializationReason; + + public void validateQueryMaterialization(RelNode relNode) { + relNode.accept(this); + } + + @Override + public RelNode visit(TableScan scan) { + if (scan instanceof HiveTableScan) { + HiveTableScan hiveScan = (HiveTableScan) scan; + RelOptHiveTable relOptHiveTable = (RelOptHiveTable) hiveScan.getTable(); + Table tab = relOptHiveTable.getHiveTableMD(); + if (tab.isTemporary()) { + setInvalidMaterializationReason(tab.getTableName() + " is a temporary table"); + return scan; + } + TableType tt = tab.getTableType(); + if (tab.getTableType() == TableType.EXTERNAL_TABLE) { + setInvalidMaterializationReason(tab.getFullyQualifiedName() + " is an external table"); + return scan; + } + } + + return scan; + } + + public RelNode visit(HiveProject project) { + // Check expressions + for (RexNode expr : project.getChildExps()) { + if (!checkExpr(expr)) { + return project; + } + } + return super.visit(project); + } + + public RelNode visit(HiveFilter filter) { + // Check expressions + for (RexNode expr : filter.getChildExps()) { + if (!checkExpr(expr)) { + return filter; + } + } + return super.visit(filter); + } + + public RelNode visit(HiveJoin join) { + // Child Expressions + for (RexNode expr : join.getChildExps()) { + if (!checkExpr(expr)) { + return join; + } + } + + // Join filter + if (!checkExpr(join.getJoinFilter())) { + return join; + } + + // Join predicates + // TODO: is this correct?? + JoinPredicateInfo predicateInfo = join.getJoinPredicateInfo(); + for (int inputIdx = 0; inputIdx < join.getInputs().size(); ++inputIdx) { + for (int predIdx = 0; predIdx < predicateInfo.getEquiJoinPredicateElements().size(); ++predIdx) { + List joinKeys = predicateInfo.getEquiJoinPredicateElements().get(predIdx).getJoinExprs(inputIdx); + for (RexNode expr : joinKeys) { + if (!checkExpr(expr)) { + return join; + } + } + } + } + + return super.visit(join); + } + + private boolean checkExpr(RexNode expr) { + RexCall invalidCall = HiveCalciteUtil.checkMaterializable(expr); + if (invalidCall != null) { + setInvalidMaterializationReason( + invalidCall.getOperator().getName() + " is not a deterministic function"); + return false; + } + return true; + } + + public String getInvalidMaterializationReason() { + return invalidMaterializationReason; + } + + public void setInvalidMaterializationReason(String invalidMaterializationReason) { + this.invalidMaterializationReason = invalidMaterializationReason; + } + + public boolean isValidMaterialization() { + return invalidMaterializationReason == null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 372cfad..ead96c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -148,6 +150,9 @@ protected LineageInfo linfo; protected TableAccessInfo tableAccessInfo; protected ColumnAccessInfo columnAccessInfo; + + protected CacheUsage cacheUsage; + /** * Columns accessed by updates */ @@ -1940,4 +1945,12 @@ protected HiveTxnManager getTxnMgr() { } return SessionState.get().getTxnMgr(); } + + public CacheUsage getCacheUsage() { + return cacheUsage; + } + + public void setCacheUsage(CacheUsage cacheUsage) { + this.cacheUsage = cacheUsage; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 85a1f34..cf2bc13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -154,6 +154,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOpMaterializationValidator; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -1442,6 +1443,16 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Plan generation"); + // Validate query materialization (materialized views, query results caching. + // This check needs to occur before constant folding, which may remove some + // function calls from the query plan. + HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator(); + matValidator.validateQueryMaterialization(calciteGenPlan); + if (!matValidator.isValidMaterialization()) { + String reason = matValidator.getInvalidMaterializationReason(); + setInvalidQueryMaterializationReason(reason); + } + // Create executor RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster); calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index ae2ec3d..5789ee0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -675,6 +675,9 @@ public void setNoScanAnalyzeCommand(boolean isNoScanAnalyzeCommand) { return insertOverwriteTables; } + public boolean hasInsertTables() { + return this.insertIntoTables.size() > 0 || this.insertOverwriteTables.size() > 0; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 28e1041..8c4cb9f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -83,10 +83,13 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; @@ -356,6 +359,8 @@ HiveParser.TOK_ORDERBY, HiveParser.TOK_WINDOWSPEC, HiveParser.TOK_CLUSTERBY, HiveParser.TOK_DISTRIBUTEBY, HiveParser.TOK_SORTBY); + private String invalidQueryMaterializationReason; + static class Phase1Ctx { String dest; int nextNum; @@ -11065,6 +11070,77 @@ private Table getTableObjectByName(String tableName) throws HiveException { return getTableObjectByName(tableName, true); } + private static void walkASTAndQualifyNames(ASTNode ast, + Set cteAlias, Context ctx, Hive db, Set ignoredTokens, UnparseTranslator unparseTranslator) { + Queue queue = new LinkedList<>(); + queue.add(ast); + while (!queue.isEmpty()) { + ASTNode astNode = (ASTNode) queue.poll(); + if (astNode.getToken().getType() == HiveParser.TOK_TABNAME) { + // Check if this is table name is qualified or not + String tabIdName = getUnescapedName(astNode).toLowerCase(); + // if alias to CTE contains the table name, we do not do the translation because + // cte is actually a subquery. + if (!cteAlias.contains(tabIdName)) { + unparseTranslator.addTableNameTranslation(astNode, SessionState.get().getCurrentDatabase()); + } + } + + if (astNode.getChildCount() > 0 && !ignoredTokens.contains(astNode.getToken().getType())) { + for (Node child : astNode.getChildren()) { + queue.offer(child); + } + } + } + } + + // Walk through the AST. + // Replace all TOK_TABREF with fully qualified table name, if it is not already fully qualified. + protected String rewriteQueryWithQualifiedNames(ASTNode ast, TokenRewriteStream tokenRewriteStream) throws SemanticException { + UnparseTranslator unparseTranslator = new UnparseTranslator(conf); + unparseTranslator.enable(); + + // 1. collect information about CTE if there is any. + // The base table of CTE should be masked. + // The CTE itself should not be masked in the references in the following main query. + Set cteAlias = new HashSet<>(); + if (ast.getChildCount() > 0 + && HiveParser.TOK_CTE == ((ASTNode) ast.getChild(0)).getToken().getType()) { + // the structure inside CTE is like this + // TOK_CTE + // TOK_SUBQUERY + // sq1 (may refer to sq2) + // ... + // TOK_SUBQUERY + // sq2 + ASTNode cte = (ASTNode) ast.getChild(0); + // we start from sq2, end up with sq1. + for (int index = cte.getChildCount() - 1; index >= 0; index--) { + ASTNode subq = (ASTNode) cte.getChild(index); + String alias = unescapeIdentifier(subq.getChild(1).getText()); + if (cteAlias.contains(alias)) { + throw new SemanticException("Duplicate definition of " + alias); + } else { + cteAlias.add(alias); + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + } + // walk the other part of ast + for (int index = 1; index < ast.getChildCount(); index++) { + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + } + // there is no CTE, walk the whole AST + else { + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + + unparseTranslator.applyTranslations(tokenRewriteStream); + String rewrittenQuery = tokenRewriteStream.toString( + ast.getTokenStartIndex(), ast.getTokenStopIndex()); + return rewrittenQuery; + } + private static void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set cteAlias, Context ctx, Hive db, Map tabNameToTabObject, Set ignoredTokens) throws SemanticException { @@ -11403,6 +11479,20 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce } } + // Check query results cache. + // If no masking/filtering required, then we can check the cache now, before + // generating the operator tree and going through CBO. + // Otherwise we have to wait until after the masking/filtering step. + boolean isCacheEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED); + QueryResultsCache.LookupInfo lookupInfo = null; + boolean needsTransform = needsTransform(); + if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) { + lookupInfo = createLookupInfoForQuery(ast); + if (checkResultsCache(lookupInfo)) { + return; + } + } + // 2. Gen OP Tree from resolved Parse Tree Operator sinkOp = genOPTree(ast, plannerCtx); @@ -11425,6 +11515,16 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce } } + // Check query results cache + // In the case that row or column masking/filtering was required, the cache must be checked + // here, after applying the masking/filtering rewrite rules to the AST. + if (isCacheEnabled && needsTransform && queryTypeCanUseCache()) { + lookupInfo = createLookupInfoForQuery(ast); + if (checkResultsCache(lookupInfo)) { + return; + } + } + // 3. Deduce Resultset Schema if (createVwDesc != null && !this.ctx.isCboSucceeded()) { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); @@ -11559,6 +11659,15 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce putAccessedColumnsToReadEntity(inputs, columnAccessInfo); } + if (isCacheEnabled && lookupInfo != null) { + if (queryCanBeCached()) { + QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo); + + // Specify that the results of this query can be cached. + setCacheUsage(new CacheUsage( + CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo)); + } + } } private void putAccessedColumnsToReadEntity(HashSet inputs, ColumnAccessInfo columnAccessInfo) { @@ -13770,6 +13879,149 @@ public void setLoadFileWork(List loadFileWork) { this.loadFileWork = loadFileWork; } + /** + * Generate the query string for this query (with fully resolved table references). + * @return The query string with resolved references. NULL if an error occurred. + */ + private String getQueryStringForCache(ASTNode ast) { + // Use the UnparseTranslator to resolve unqualified table names. + String queryString = ctx.getTokenRewriteStream().toString(ast.getTokenStartIndex(), ast.getTokenStopIndex()); + + // Re-using the TokenRewriteStream map for views so we do not overwrite the current TokenRewriteStream + String rewriteStreamName = "__qualified_query_string__"; + ASTNode astNode; + try { + astNode = ParseUtils.parse(queryString, ctx, rewriteStreamName); + TokenRewriteStream tokenRewriteStream = ctx.getViewTokenRewriteStream(rewriteStreamName); + String fullyQualifiedQuery = rewriteQueryWithQualifiedNames(astNode, tokenRewriteStream); + return fullyQualifiedQuery; + } catch (Exception err) { + LOG.error("Unexpected error while reparsing the query string [" + queryString + "]", err); + // Don't fail the query - just return null (caller should skip cache lookup). + return null; + } + } + + private QueryResultsCache.LookupInfo createLookupInfoForQuery(ASTNode astNode) { + QueryResultsCache.LookupInfo lookupInfo = null; + String queryString = getQueryStringForCache(astNode); + if (queryString != null) { + lookupInfo = new QueryResultsCache.LookupInfo(queryString); + } + return lookupInfo; + } + + /** + * Set the query plan to use cache entry passed in to return the query results. + * @param cacheEntry The results cache entry that will be used to resolve the query. + */ + private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTask = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf); + setFetchTask(fetchTask); + + queryState.setCommandType(cacheEntry.getQueryInfo().getHiveOperation()); + resultSchema = cacheEntry.getQueryInfo().getResultSchema(); + setTableAccessInfo(cacheEntry.getQueryInfo().getTableAccessInfo()); + setColumnAccessInfo(cacheEntry.getQueryInfo().getColumnAccessInfo()); + inputs.addAll(cacheEntry.getQueryInfo().getInputs()); + + // Indicate that the query will use a cached result. + setCacheUsage(new CacheUsage( + CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); + } + + private QueryResultsCache.QueryInfo createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) { + return new QueryResultsCache.QueryInfo(lookupInfo, queryState.getHiveOperation(), + resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs); + } + + /** + * Some initial checks for a query to see if we can look this query up in the results cache. + */ + private boolean queryTypeCanUseCache() { + if (this instanceof ColumnStatsSemanticAnalyzer) { + // Column stats generates "select compute_stats() .." queries. + // Disable caching for these. + return false; + } + + if (queryState.getHiveOperation() != HiveOperation.QUERY) { + return false; + } + + if (qb.getParseInfo().isAnalyzeCommand()) { + return false; + } + + if (qb.getParseInfo().hasInsertTables()) { + return false; + } + + return true; + } + + private boolean needsTransform() { + return SessionState.get().getAuthorizerV2() != null && + SessionState.get().getAuthorizerV2().needTransform(); + } + + /** + * Called after a query plan has been generated, to determine if the results of this query + * can be added to the results cache. + */ + private boolean queryCanBeCached() { + if (!queryTypeCanUseCache()) { + LOG.info("Not eligible for results caching - wrong query type"); + return false; + } + + // Query should have a fetch task. + if (getFetchTask() == null) { + LOG.info("Not eligible for results caching - no fetch task"); + return false; + } + + // At least one mr/tez/spark job + if (Utilities.getNumClusterJobs(getRootTasks()) == 0) { + LOG.info("Not eligible for results caching - no mr/tez/spark jobs"); + return false; + } + + if (!isValidQueryMaterialization()) { + LOG.info("Not eligible for results caching - {}", getInvalidQueryMaterializationReason()); + return false; + } + + return true; + } + + /** + * Check the query results cache to see if the query represented by the lookupInfo can be + * answered using the results cache. If the cache contains a suitable entry, the semantic analyzer + * will be configured to use the found cache entry to anwer the query. + */ + private boolean checkResultsCache(QueryResultsCache.LookupInfo lookupInfo) { + if (lookupInfo == null) { + return false; + } + try { + // In case this has not been initialized elsewhere. + QueryResultsCache.initialize(conf); + } catch (Exception err) { + throw new IllegalStateException(err); + } + QueryResultsCache.CacheEntry cacheEntry = QueryResultsCache.getInstance().lookup(lookupInfo); + if (cacheEntry != null) { + // Use the cache rather than full query execution. + useCachedResult(cacheEntry); + + // At this point the caller should return from semantic analysis. + return true; + } + return false; + } + private static final class ColsAndTypes { public ColsAndTypes(String cols, String colTypes) { this.cols = cols; @@ -13778,4 +14030,17 @@ public ColsAndTypes(String cols, String colTypes) { public String cols; public String colTypes; } + + public String getInvalidQueryMaterializationReason() { + return invalidQueryMaterializationReason; + } + + public void setInvalidQueryMaterializationReason( + String invalidQueryMaterializationReason) { + this.invalidQueryMaterializationReason = invalidQueryMaterializationReason; + } + + public boolean isValidQueryMaterialization() { + return (invalidQueryMaterializationReason == null); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index 7243dc7..1f139c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -75,6 +75,11 @@ */ private boolean isUsingThriftJDBCBinarySerDe = false; + /** + * Whether this FetchWork is returning a cached query result + */ + private boolean isCachedResult = false; + public boolean isHiveServerQuery() { return isHiveServerQuery; } @@ -364,4 +369,12 @@ public FetchExplainVectorization getMapExplainVectorization() { } return new FetchExplainVectorization(this); } + @Explain(displayName = "Cached Query Result", displayOnlyOnTrue = true, explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public boolean isCachedResult() { + return isCachedResult; + } + + public void setCachedResult(boolean isCachedResult) { + this.isCachedResult = isCachedResult; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 8af19d8..3f0f148 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -720,27 +720,7 @@ private void createSessionDirs(String userName) throws IOException { */ private Path createRootHDFSDir(HiveConf conf) throws IOException { Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); - FsPermission writableHDFSDirPermission = new FsPermission((short)00733); - FileSystem fs = rootHDFSDirPath.getFileSystem(conf); - if (!fs.exists(rootHDFSDirPath)) { - Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); - } - FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); - if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { - String schema = rootHDFSDirPath.toUri().getScheme(); - LOG.debug( - "HDFS root scratch dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + - currentHDFSDirPermission); - } else { - LOG.debug( - "HDFS root scratch dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); - } - // If the root HDFS scratch dir already exists, make sure it is writeable. - if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission - .toShort()) == writableHDFSDirPermission.toShort())) { - throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath - + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); - } + Utilities.ensurePathIsWritable(rootHDFSDirPath, conf); return rootHDFSDirPath; } diff --git a/ql/src/test/queries/clientpositive/results_cache_1.q b/ql/src/test/queries/clientpositive/results_cache_1.q new file mode 100644 index 0000000..0c9010b --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_1.q @@ -0,0 +1,62 @@ + +set hive.query.results.cache.enabled=true; + +explain +select count(*) from src a join src b on (a.key = b.key); +select count(*) from src a join src b on (a.key = b.key); + +set test.comment="Cache should be used for this query"; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); +select count(*) from src a join src b on (a.key = b.key); + +set hive.query.results.cache.enabled=false; +set test.comment="Cache is disabled, should not be used here."; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); + +create database db1; +use db1; +create table src as select key, value from default.src; + +set hive.query.results.cache.enabled=true; +set test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables"; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); + + +-- Union +select * from src where key = 0 +union all +select * from src where key = 2; + +set test.comment="Union all. Cache should be used now"; +set test.comment; +explain +select * from src where key = 0 +union all +select * from src where key = 2; + +select * from src where key = 0 +union all +select * from src where key = 2; + + +-- CTE +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; + +set test.comment="CTE. Cache should be used now"; +set test.comment; +explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; + +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; diff --git a/ql/src/test/queries/clientpositive/results_cache_2.q b/ql/src/test/queries/clientpositive/results_cache_2.q new file mode 100644 index 0000000..96a9092 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_2.q @@ -0,0 +1,41 @@ + +set hive.query.results.cache.enabled=true; +set hive.fetch.task.conversion=more; + +-- Test 1: fetch task +explain +select key, value from src where key=0; +select key, value from src where key=0; + +set test.comment=Query only requires fetch task - should not use results cache; +set test.comment; +explain +select key, value from src where key=0; + + +-- Test 2: deterministic function should use cache. +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1; + +set test.comment=This query should use the cache; +set test.comment; +explain +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1; + +-- Test 3: non-deterministic functions should not be cached +-- Set current timestamp config to get repeatable result. +set hive.test.currenttimestamp=2012-01-01 01:02:03; + +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1; + +set test.comment=Queries using non-deterministic functions should not use results cache; +set test.comment; +explain +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1; diff --git a/ql/src/test/queries/clientpositive/results_cache_capacity.q b/ql/src/test/queries/clientpositive/results_cache_capacity.q new file mode 100644 index 0000000..9f54577 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_capacity.q @@ -0,0 +1,52 @@ + +set hive.query.results.cache.enabled=true; + +-- Allow results cache to hold entries up to 125 bytes +-- The single row queries are small enough to fit in the cache (103 bytes) +-- But the cache is only large enough to hold up to 2 entries at that size. +-- This may need to be adjusted if the sizes below change +set hive.query.results.cache.max.size=250; +set hive.query.results.cache.max.entry.size=125; + + +select key, count(*) from src where key = 0 group by key; +set test.comment=Q1 should be cached; +set test.comment; +explain +select key, count(*) from src where key = 0 group by key; + + +select key, count(*) from src where key = 2 group by key; +set test.comment=Q2 should now be cached; +set test.comment; +explain +select key, count(*) from src where key = 2 group by key; + +set test.comment=Q1 should still be cached; +set test.comment; +explain +select key, count(*) from src where key = 0 group by key; + +-- Add another query to the cache. Cache not large enough to hold all 3 queries. +-- Due to LRU (Q1 last looked up), Q2 should no longer be in the cache. +select key, count(*) from src where key = 4 group by key; +set test.comment=Q3 should now be cached; +set test.comment; +explain +select key, count(*) from src where key = 4 group by key; + +set test.comment=Q1 should still be cached; +set test.comment; +explain +select key, count(*) from src where key = 0 group by key; + +set test.comment=Q2 should no longer be in the cache; +set test.comment; +explain +select key, count(*) from src where key = 2 group by key; + +-- Query should not be cached because it exceeds the max entry size (183 bytes). +select key, count(*) from src where key < 10 group by key; +set test.comment=Q4 is too large to be cached; +explain +select key, count(*) from src where key < 10 group by key; diff --git a/ql/src/test/queries/clientpositive/results_cache_lifetime.q b/ql/src/test/queries/clientpositive/results_cache_lifetime.q new file mode 100644 index 0000000..60ffe96 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_lifetime.q @@ -0,0 +1,14 @@ + +set hive.query.results.cache.enabled=true; +set hive.query.results.cache.max.entry.lifetime=2; + +-- This query used the cache from results_cache_1.q. Load it up. +select count(*) from src a join src b on (a.key = b.key); + +-- Make sure we are past the cache entry lifetime +select reflect("java.lang.Thread", 'sleep', cast(2000 as bigint)); + +set test.comment="Cached entry should be expired - query should not use cache"; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); diff --git a/ql/src/test/queries/clientpositive/results_cache_temptable.q b/ql/src/test/queries/clientpositive/results_cache_temptable.q new file mode 100644 index 0000000..73832a0 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_temptable.q @@ -0,0 +1,41 @@ + +create table rct (key string, value string); +load data local inpath '../../data/files/kv1.txt' overwrite into table rct; + +create table rct_part (key string, value string) partitioned by (ds string); +load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-08"); +load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-09"); + +create temporary table tmptab as select * from src; + +select count(*) from tmptab where key = 0; +set test.comment="Query involving temp tables should not be added to the cache"; +set test.comment; +explain +select count(*) from tmptab where key = 0; + +-- A cached query should not be used if one of the tables used now resolves to a temp table. +select count(*) from rct where key = 0; +set test.comment="Query should use the cache"; +set test.comment; +explain +select count(*) from rct where key = 0; +-- Create temp table with same name, which takes precedence over the non-temp table. +create temporary table rct as select * from tmptab; +set test.comment="Cached query does not get used now that it resolves to a temp table"; +set test.comment; +explain +select count(*) from rct where key = 0; + +-- Try with partitioned table +select count(*) from rct_part where ds="2008-04-08" and key = 0; +set test.comment="Query should use the cache"; +set test.comment; +explain +select count(*) from rct_part where ds="2008-04-08" and key = 0; +-- Create temp table with same name, which takes precedence over the non-temp table. +create temporary table rct_part as select key, value, "2008-04-08" as ds from tmptab; +set test.comment="Cached query does not get used now that it resolves to a temp table"; +set test.comment; +explain +select count(*) from rct_part where ds="2008-04-08" and key = 0; diff --git a/ql/src/test/queries/clientpositive/results_cache_with_masking.q b/ql/src/test/queries/clientpositive/results_cache_with_masking.q new file mode 100644 index 0000000..b4fcdd5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_with_masking.q @@ -0,0 +1,17 @@ + +set hive.mapred.mode=nonstrict; +set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest; + +set hive.query.results.cache.enabled=true; + +create table masking_test as select cast(key as int) as key, value from src; + +explain +select key, count(*) from masking_test group by key; +select key, count(*) from masking_test group by key; + +-- This time we should use the cache +explain +select key, count(*) from masking_test group by key; +select key, count(*) from masking_test group by key; + diff --git a/ql/src/test/results/clientpositive/llap/results_cache_1.q.out b/ql/src/test/results/clientpositive/llap/results_cache_1.q.out new file mode 100644 index 0000000..c3920c4 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/results_cache_1.q.out @@ -0,0 +1,462 @@ +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 809 Data size: 6472 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +test.comment="Cache should be used for this query" +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +1028 +test.comment="Cache is disabled, should not be used here." +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 809 Data size: 6472 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: create database db1 +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:db1 +POSTHOOK: query: create database db1 +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:db1 +PREHOOK: query: use db1 +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:db1 +POSTHOOK: query: use db1 +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:db1 +PREHOOK: query: create table src as select key, value from default.src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:db1 +PREHOOK: Output: db1@src +POSTHOOK: query: create table src as select key, value from default.src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:db1 +POSTHOOK: Output: db1@src +POSTHOOK: Lineage: src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables" +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 87584 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 87584 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 475 Data size: 83204 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 522 Data size: 91524 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +2 val_2 +test.comment="Union all. Cache should be used now" +PREHOOK: query: explain +select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +POSTHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +0 val_0 +0 val_0 +0 val_0 +2 val_2 +PREHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 0 +2 2 +4 4 +5 5 +8 8 +9 9 +test.comment="CTE. Cache should be used now" +PREHOOK: query: explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +POSTHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +0 0 +2 2 +4 4 +5 5 +8 8 +9 9 diff --git a/ql/src/test/results/clientpositive/results_cache_1.q.out b/ql/src/test/results/clientpositive/results_cache_1.q.out new file mode 100644 index 0000000..fb28a11 --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_1.q.out @@ -0,0 +1,453 @@ +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +test.comment="Cache should be used for this query" +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +test.comment="Cache is disabled, should not be used here." +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: create database db1 +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:db1 +POSTHOOK: query: create database db1 +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:db1 +PREHOOK: query: use db1 +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:db1 +POSTHOOK: query: use db1 +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:db1 +PREHOOK: query: create table src as select key, value from default.src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:db1 +PREHOOK: Output: db1@src +POSTHOOK: query: create table src as select key, value from default.src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:db1 +POSTHOOK: Output: db1@src +POSTHOOK: Lineage: src.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables" +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +2 val_2 +test.comment="Union all. Cache should be used now" +PREHOOK: query: explain +select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: select * from src where key = 0 +union all +select * from src where key = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +2 val_2 +PREHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 0 +2 2 +4 4 +5 5 +8 8 +9 9 +test.comment="CTE. Cache should be used now" +PREHOOK: query: explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: db1@src +#### A masked pattern was here #### +POSTHOOK: query: with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: db1@src +#### A masked pattern was here #### +0 0 +2 2 +4 4 +5 5 +8 8 +9 9 diff --git a/ql/src/test/results/clientpositive/results_cache_2.q.out b/ql/src/test/results/clientpositive/results_cache_2.q.out new file mode 100644 index 0000000..ab1b0de --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_2.q.out @@ -0,0 +1,176 @@ +PREHOOK: query: explain +select key, value from src where key=0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, value from src where key=0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 0.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + ListSink + +PREHOOK: query: select key, value from src where key=0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, value from src where key=0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +test.comment=Query only requires fetch task - should not use results cache +PREHOOK: query: explain +select key, value from src where key=0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, value from src where key=0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 0.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + ListSink + +PREHOOK: query: select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +NULL 10 +test.comment=This query should use the cache +PREHOOK: query: explain +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +2012-01-01 01:02:03 10 +test.comment=Queries using non-deterministic functions should not use results cache +PREHOOK: query: explain +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) < 10.0) (type: boolean) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: 2012-01-01 01:02:03.0 (type: timestamp) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp) + sort order: + + Map-reduce partition columns: _col0 (type: timestamp) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: timestamp) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: 2012-01-01 01:02:03.0 (type: timestamp), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/ql/src/test/results/clientpositive/results_cache_capacity.q.out b/ql/src/test/results/clientpositive/results_cache_capacity.q.out new file mode 100644 index 0000000..695d47d --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_capacity.q.out @@ -0,0 +1,238 @@ +PREHOOK: query: select key, count(*) from src where key = 0 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src where key = 0 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 3 +test.comment=Q1 should be cached +PREHOOK: query: explain +select key, count(*) from src where key = 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select key, count(*) from src where key = 2 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src where key = 2 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +2 1 +test.comment=Q2 should now be cached +PREHOOK: query: explain +select key, count(*) from src where key = 2 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 2 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +test.comment=Q1 should still be cached +PREHOOK: query: explain +select key, count(*) from src where key = 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select key, count(*) from src where key = 4 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src where key = 4 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +4 1 +test.comment=Q3 should now be cached +PREHOOK: query: explain +select key, count(*) from src where key = 4 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 4 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +test.comment=Q1 should still be cached +PREHOOK: query: explain +select key, count(*) from src where key = 0 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 0 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +test.comment=Q2 should no longer be in the cache +PREHOOK: query: explain +select key, count(*) from src where key = 2 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key = 2 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 2.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select key, count(*) from src where key < 10 group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src where key < 10 group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 3 +2 1 +4 1 +5 3 +8 1 +9 1 +PREHOOK: query: explain +select key, count(*) from src where key < 10 group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from src where key < 10 group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) < 10.0) (type: boolean) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/ql/src/test/results/clientpositive/results_cache_lifetime.q.out b/ql/src/test/results/clientpositive/results_cache_lifetime.q.out new file mode 100644 index 0000000..ea5d7e0 --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_lifetime.q.out @@ -0,0 +1,112 @@ +PREHOOK: query: select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +PREHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(2000 as bigint)) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(2000 as bigint)) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +null +test.comment="Cached entry should be expired - query should not use cache" +PREHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from src a join src b on (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/ql/src/test/results/clientpositive/results_cache_temptable.q.out b/ql/src/test/results/clientpositive/results_cache_temptable.q.out new file mode 100644 index 0000000..5350dba --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_temptable.q.out @@ -0,0 +1,293 @@ +PREHOOK: query: create table rct (key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@rct +POSTHOOK: query: create table rct (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@rct +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@rct +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@rct +PREHOOK: query: create table rct_part (key string, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@rct_part +POSTHOOK: query: create table rct_part (key string, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@rct_part +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-08") +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@rct_part +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-08") +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@rct_part +POSTHOOK: Output: default@rct_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-09") +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@rct_part +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' overwrite into table rct_part partition (ds="2008-04-09") +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@rct_part +POSTHOOK: Output: default@rct_part@ds=2008-04-09 +PREHOOK: query: create temporary table tmptab as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@tmptab +POSTHOOK: query: create temporary table tmptab as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tmptab +PREHOOK: query: select count(*) from tmptab where key = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tmptab +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tmptab where key = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmptab +#### A masked pattern was here #### +3 +test.comment="Query involving temp tables should not be added to the cache" +PREHOOK: query: explain +select count(*) from tmptab where key = 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tmptab where key = 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: tmptab + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 0.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from rct where key = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@rct +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from rct where key = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@rct +#### A masked pattern was here #### +3 +test.comment="Query should use the cache" +PREHOOK: query: explain +select count(*) from rct where key = 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from rct where key = 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: create temporary table rct as select * from tmptab +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@tmptab +PREHOOK: Output: database:default +PREHOOK: Output: default@rct +POSTHOOK: query: create temporary table rct as select * from tmptab +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@tmptab +POSTHOOK: Output: database:default +POSTHOOK: Output: default@rct +test.comment="Cached query does not get used now that it resolves to a temp table" +PREHOOK: query: explain +select count(*) from rct where key = 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from rct where key = 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: rct + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 0.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from rct_part where ds="2008-04-08" and key = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@rct_part +PREHOOK: Input: default@rct_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from rct_part where ds="2008-04-08" and key = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@rct_part +POSTHOOK: Input: default@rct_part@ds=2008-04-08 +#### A masked pattern was here #### +3 +test.comment="Query should use the cache" +PREHOOK: query: explain +select count(*) from rct_part where ds="2008-04-08" and key = 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from rct_part where ds="2008-04-08" and key = 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: create temporary table rct_part as select key, value, "2008-04-08" as ds from tmptab +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@tmptab +PREHOOK: Output: database:default +PREHOOK: Output: default@rct_part +POSTHOOK: query: create temporary table rct_part as select key, value, "2008-04-08" as ds from tmptab +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@tmptab +POSTHOOK: Output: database:default +POSTHOOK: Output: default@rct_part +test.comment="Cached query does not get used now that it resolves to a temp table" +PREHOOK: query: explain +select count(*) from rct_part where ds="2008-04-08" and key = 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from rct_part where ds="2008-04-08" and key = 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: rct_part + Statistics: Num rows: 500 Data size: 10812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(key) = 0.0) and (ds = '2008-04-08')) (type: boolean) + Statistics: Num rows: 125 Data size: 2703 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 125 Data size: 2703 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/ql/src/test/results/clientpositive/results_cache_with_masking.q.out b/ql/src/test/results/clientpositive/results_cache_with_masking.q.out new file mode 100644 index 0000000..3a0b2cc --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_with_masking.q.out @@ -0,0 +1,106 @@ +PREHOOK: query: create table masking_test as select cast(key as int) as key, value from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@masking_test +POSTHOOK: query: create table masking_test as select cast(key as int) as key, value from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@masking_test +POSTHOOK: Lineage: masking_test.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: masking_test.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: explain +select key, count(*) from masking_test group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from masking_test group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: masking_test + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (((key % 2) = 0) and (key < 10)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: key (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 41 Data size: 435 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 41 Data size: 435 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select key, count(*) from masking_test group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@masking_test +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from masking_test group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@masking_test +#### A masked pattern was here #### +0 3 +2 1 +4 1 +8 1 +PREHOOK: query: explain +select key, count(*) from masking_test group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select key, count(*) from masking_test group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select key, count(*) from masking_test group by key +PREHOOK: type: QUERY +PREHOOK: Input: default@masking_test +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from masking_test group by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@masking_test +#### A masked pattern was here #### +0 3 +2 1 +4 1 +8 1