diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index ecf27ceabe..af0f87bac3 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -67,4 +67,20 @@ public static final String HS2_EXECUTING_QUERIES = "hs2_executing_queries"; public static final String HS2_FAILED_QUERIES = "hs2_failed_queries"; public static final String HS2_SUCCEEDED_QUERIES = "hs2_succeeded_queries"; + + public static final String QC_MAX_SIZE = "qc_max_size"; + public static final String QC_CURRENT_SIZE = "qc_current_size"; + public static final String QC_VALID_ENTRIES = "qc_valid_entries"; + public static final String QC_LOOKUPS = "qc_lookups"; + public static final String QC_VALID_HITS = "qc_valid_hits"; + public static final String QC_PENDING_HITS = "qc_pending_hits"; + public static final String QC_PENDING_FAILS = "qc_pending_fails"; + public static final String QC_PENDING_FAILS_WAIT_TIME = "qc_pending_fails_wait_time"; + public static final String QC_PENDING_SUCCESS_WAIT_TIME = "qc_pending_success_wait_time"; + // Queries rejected from being cached due to non-deterministic functions, temp tables, or other conditions. + public static final String QC_INVALID_FOR_CACHING = "qc_invalid_for_caching"; + // Queries rejected from being cached because they exceeded the max cache entry size. + public static final String QC_REJECTED_TOO_LARGE = "qc_rejected_too_large"; + public static final String QC_TOTAL_ENTRIES_ADDED = "qc_total_entries_added"; + } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 4fa1044232..ac5ae573d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -48,6 +48,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -213,11 +217,13 @@ private void invalidate() { invalidationFuture.cancel(false); } cleanupIfNeeded(); + decrementMetric(MetricsConstant.QC_VALID_ENTRIES); } else if (prevStatus == CacheEntryStatus.PENDING) { // Need to notify any queries waiting on the change from pending status. synchronized (this) { this.notifyAll(); } + decrementMetric(MetricsConstant.QC_PENDING_FAILS); } } @@ -267,12 +273,21 @@ public boolean waitForValidStatus() { LOG.info("Waiting on pending cacheEntry"); long timeout = 1000; + long startTime = System.nanoTime(); + long endTime; + while (true) { try { switch (status) { case VALID: + endTime = System.nanoTime(); + incrementMetric(MetricsConstant.QC_PENDING_SUCCESS_WAIT_TIME, + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS)); return true; case INVALID: + endTime = System.nanoTime(); + incrementMetric(MetricsConstant.QC_PENDING_FAILS_WAIT_TIME, + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS)); return false; case PENDING: // Status has not changed, continue waiting. @@ -344,6 +359,11 @@ public static void initialize(HiveConf conf) throws IOException { if (!inited.getAndSet(true)) { try { instance = new QueryResultsCache(conf); + + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + registerMetrics(metrics, instance); + } } catch (IOException err) { inited.set(false); throw err; @@ -369,6 +389,7 @@ public CacheEntry lookup(LookupInfo request) { LOG.debug("QueryResultsCache lookup for query: {}", request.queryText); + boolean foundPending = false; Lock readLock = rwLock.readLock(); try { readLock.lock(); @@ -390,6 +411,7 @@ public CacheEntry lookup(LookupInfo request) { // Try to find valid entry, but settle for pending entry if that is all we have. if (result == null && pendingResult != null) { result = pendingResult; + foundPending = true; } if (result != null) { @@ -401,6 +423,14 @@ public CacheEntry lookup(LookupInfo request) { } LOG.debug("QueryResultsCache lookup result: {}", result); + incrementMetric(MetricsConstant.QC_LOOKUPS); + if (result != null) { + if (foundPending) { + incrementMetric(MetricsConstant.QC_PENDING_HITS); + } else { + incrementMetric(MetricsConstant.QC_VALID_HITS); + } + } return result; } @@ -500,6 +530,9 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { synchronized (cacheEntry) { cacheEntry.notifyAll(); } + + incrementMetric(MetricsConstant.QC_VALID_ENTRIES); + incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED); } catch (Exception err) { LOG.error("Failed to create cache entry for query results for query: " + queryText, err); @@ -614,6 +647,7 @@ private boolean shouldEntryBeAdded(CacheEntry entry, long size) { // Assumes the cache lock has already been taken. if (maxEntrySize >= 0 && size > maxEntrySize) { LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize); + incrementMetric(MetricsConstant.QC_REJECTED_TOO_LARGE); return false; } @@ -726,4 +760,45 @@ public void run() { }); } } + + public static void incrementMetric(String name, long count) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.incrementCounter(name, count); + } + } + + public static void decrementMetric(String name, long count) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + metrics.decrementCounter(name, count); + } + } + + public static void incrementMetric(String name) { + incrementMetric(name, 1); + } + + public static void decrementMetric(String name) { + decrementMetric(name, 1); + } + + private static void registerMetrics(Metrics metrics, final QueryResultsCache cache) { + MetricsVariable maxCacheSize = new MetricsVariable() { + @Override + public Long getValue() { + return cache.maxCacheSize; + } + }; + + MetricsVariable curCacheSize = new MetricsVariable() { + @Override + public Long getValue() { + return cache.cacheSize; + } + }; + + metrics.addGauge(MetricsConstant.QC_MAX_SIZE, maxCacheSize); + metrics.addGauge(MetricsConstant.QC_CURRENT_SIZE, curCacheSize); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 99e2c72d21..ff0a2e6a1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; @@ -14585,11 +14586,13 @@ private boolean queryCanBeCached() { // The query materialization validation check only occurs in CBO. Thus only cache results if CBO was used. if (!ctx.isCboSucceeded()) { LOG.info("Caching of query results is disabled if CBO was not run."); + QueryResultsCache.incrementMetric(MetricsConstant.QC_INVALID_FOR_CACHING); return false; } if (!isValidQueryMaterialization()) { LOG.info("Not eligible for results caching - {}", getInvalidQueryMaterializationReason()); + QueryResultsCache.incrementMetric(MetricsConstant.QC_INVALID_FOR_CACHING); return false; }