diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 8fa5cbf..6f9347c 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -21,14 +21,19 @@ import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; /** * PerfLogger. @@ -122,8 +127,8 @@ public void PerfLogBegin(String callerName, String method) { startTimes.put(method, new Long(startTime)); if (LOG.isDebugEnabled()) { LOG.debug(""); - beginMetrics(method); } + beginMetrics(method); } /** * Call this function in correspondence of PerfLogBegin to mark the end of the measurement. @@ -162,9 +167,8 @@ public long PerfLogEnd(String callerName, String method, String additionalInfo) } sb.append(">"); LOG.debug(sb.toString()); - - endMetrics(method); } + endMetrics(method); return duration; } @@ -202,11 +206,24 @@ public Long getDuration(String method) { return duration; } + + public ImmutableMap getStartTimes() { + return ImmutableMap.copyOf(startTimes); + } + + public ImmutableMap getEndTimes() { + return ImmutableMap.copyOf(endTimes); + } + + //Methods for metrics integration. Each thread-local PerfLogger will open/close scope during each perf-log method. + Map openScopes = new HashMap(); + private void beginMetrics(String method) { Metrics metrics = MetricsFactory.getInstance(); try { if (metrics != null) { - metrics.startStoredScope(method); + MetricsScope scope = metrics.createScope(method); + openScopes.put(method, scope); } } catch (IOException e) { LOG.warn("Error recording metrics", e); @@ -217,18 +234,30 @@ private void endMetrics(String method) { Metrics metrics = MetricsFactory.getInstance(); try { if (metrics != null) { - metrics.endStoredScope(method); + MetricsScope scope = openScopes.remove(method); + if (scope != null) { + metrics.endScope(scope); + } } } catch (IOException e) { LOG.warn("Error recording metrics", e); } } - public ImmutableMap getStartTimes() { - return ImmutableMap.copyOf(startTimes); - } - - public ImmutableMap getEndTimes() { - return ImmutableMap.copyOf(endTimes); + /** + * Cleans up any dangling perfLog metric call scopes. + */ + public void cleanupPerfLogMetrics() { + Metrics metrics = MetricsFactory.getInstance(); + try { + if (metrics != null) { + for (MetricsScope openScope : openScopes.values()) { + metrics.endScope(openScope); + } + } + } catch (IOException e) { + LOG.warn("Error cleaning up metrics", e); + } + openScopes.clear(); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java index ca21bb6..98cb3ec 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java @@ -42,7 +42,6 @@ @BeforeClass public static void before() throws Exception { - int port = MetaStoreUtils.findFreePort(); hiveConf = new HiveConf(TestMetaStoreMetrics.class); @@ -51,11 +50,16 @@ public static void before() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + MetricsFactory.close(); + MetricsFactory.init(hiveConf); + metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + + //Increments one HMS connection MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); + + //Increments one HMS connection (Hive.get()) SessionState.start(new CliSessionState(hiveConf)); driver = new Driver(hiveConf); - - metrics = (CodahaleMetrics) MetricsFactory.getInstance(); } @@ -63,8 +67,9 @@ public static void before() throws Exception { public void testMethodCounts() throws Exception { driver.run("show databases"); String json = metrics.dumpJson(); - MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_get_all_databases", 1); + //one call by init, one called here. + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_get_all_databases", 2); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java index 6a98968..7337e9c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hive.service.cli.CLIServiceClient; import org.apache.hive.service.cli.SessionHandle; +import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -43,7 +45,6 @@ private static MiniHS2 miniHS2; private static Map confOverlay; - private static CodahaleMetrics metrics; //Check metrics during semantic analysis. public static class MetricCheckingHook implements HiveSemanticAnalyzerHook { @@ -79,11 +80,14 @@ public static void setup() throws Exception { confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED.varname, "true"); confOverlay.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); miniHS2.start(confOverlay); - - HiveConf conf = new HiveConf(); + } - metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + @Before + public void before() throws Exception { + HiveConf conf = new HiveConf(); + MetricsFactory.close(); + MetricsFactory.init(conf); } @Test @@ -112,6 +116,35 @@ public void testMetrics() throws Exception { MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_compile", 0); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 0); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_hs2_sql_operation_RUNNING", 0); + + serviceClient.closeSession(sessHandle); + } + + @Test + public void testClosedScopes() throws Exception { + CLIServiceClient serviceClient = miniHS2.getServiceClient(); + SessionHandle sessHandle = serviceClient.openSession("foo", "bar"); + + //this should error at analyze scope + Exception expectedException = null; + try { + serviceClient.executeStatement(sessHandle, "select aaa", confOverlay); + } catch (Exception e) { + expectedException = e; + } + Assert.assertNotNull("Expected semantic exception", expectedException); + + //verify all scopes were recorded + CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + String json = metrics.dumpJson(); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_parse", 1); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER, "api_semanticAnalyze", 1); + + //verify all scopes are closed. + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_parse", 0); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 0); + + serviceClient.closeSession(sessHandle); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index abf94ff..bd510d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1458,6 +1458,7 @@ private boolean isExplicitLockOperation() { } private CommandProcessorResponse createProcessorResponse(int ret) { + SessionState.getPerfLogger().cleanupPerfLogMetrics(); queryDisplay.setErrorMessage(errorMessage); return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); }