diff --git a/common/pom.xml b/common/pom.xml index 2292fdf..cd14581 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -239,6 +239,23 @@ + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + *.xml + + + + + diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java index 52d99e4..9be9b50 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.common.metrics; import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; @@ -56,7 +57,7 @@ private LegacyMetrics() { * (i) a "number of calls" counter ( <name>.n ), and * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) */ - public static class MetricsScope { + public static class LegacyMetricsScope implements MetricsScope { final LegacyMetrics metrics; @@ -73,7 +74,7 @@ private LegacyMetrics() { * @param name - name of the variable * @throws IOException */ - private MetricsScope(String name, LegacyMetrics metrics) throws IOException { + private LegacyMetricsScope(String name, LegacyMetrics metrics) throws IOException { this.metrics = metrics; this.name = name; this.numCounter = name + ".n"; @@ -150,11 +151,11 @@ public void reopen() throws IOException { } } - private static final ThreadLocal> threadLocalScopes - = new ThreadLocal>() { + private static final ThreadLocal> threadLocalScopes + = new ThreadLocal>() { @Override - protected HashMap initialValue() { - return new HashMap(); + protected HashMap initialValue() { + return new HashMap(); } }; @@ -212,15 +213,15 @@ public Object get(String name) throws IOException{ return metrics.get(name); } - public void startScope(String name) throws IOException{ + public void startStoredScope(String name) throws IOException{ if (threadLocalScopes.get().containsKey(name)) { threadLocalScopes.get().get(name).open(); } else { - threadLocalScopes.get().put(name, new MetricsScope(name, this)); + threadLocalScopes.get().put(name, new LegacyMetricsScope(name, this)); } } - public MetricsScope getScope(String name) throws IOException { + public MetricsScope getStoredScope(String name) throws IOException { if (threadLocalScopes.get().containsKey(name)) { return threadLocalScopes.get().get(name); } else { @@ -228,13 +229,19 @@ public MetricsScope getScope(String name) throws IOException { } } - public void endScope(String name) throws IOException{ + public void endStoredScope(String name) throws IOException{ if (threadLocalScopes.get().containsKey(name)) { threadLocalScopes.get().get(name).close(); } } + public MetricsScope createScope(String name) throws IOException { + return new LegacyMetricsScope(name, this); + } + public void endScope(MetricsScope scope) throws IOException { + ((LegacyMetricsScope) scope).close(); + } /** * Resets the static context state to initial. diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java index 49b2b32..4297233 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java @@ -28,20 +28,40 @@ */ public interface Metrics { - //Must declare CTOR taking in HiveConf. - /** * Deinitializes the Metrics system. */ public void close() throws Exception; /** + * + * @param name starts a scope of a given name. Scopes is stored as thread-local variable. + * @throws IOException + */ + public void startStoredScope(String name) throws IOException; + + /** + * Closes the stored scope of a given name. + * Note that this must be called on the same thread as where the scope was started. + * @param name + * @throws IOException + */ + public void endStoredScope(String name) throws IOException; + + /** + * Create scope with given name and returns it. * @param name + * @return * @throws IOException */ - public void startScope(String name) throws IOException; + public MetricsScope createScope(String name) throws IOException; - public void endScope(String name) throws IOException; + /** + * Close the given scope. + * @param scope + * @throws IOException + */ + public void endScope(MetricsScope scope) throws IOException; //Counter-related methods diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index a5aa995..f18aa6e 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -38,4 +38,10 @@ public static final String ZOOKEEPER_HIVE_SHAREDLOCKS = "zookeeper_hive_sharedlocks"; public static final String ZOOKEEPER_HIVE_EXCLUSIVELOCKS = "zookeeper_hive_exclusivelocks"; public static final String ZOOKEEPER_HIVE_SEMISHAREDLOCKS = "zookeeper_hive_semisharedlocks"; + + public static final String EXEC_ASYNC_QUEUE_SIZE = "exec_async_queue_size"; + public static final String EXEC_ASYNC_POOL_SIZE = "exec_async_pool_size"; + + public static final String OPERATION_PREFIX = "hs2_operation_"; + public static final String COMPLETED_OPERATION_PREFIX = "hs2_completed_operation_"; } diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java new file mode 100644 index 0000000..3d6a23e --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics.common; + +/** + * Metrics Scope to represent duration of an event. + * + * Implementation can capture information like the average duration of open scopes, + * number of open scopes, number of completed scopes. + * + * Scopes are created via the Metrics framework (see Metrics#createScope or Metrics$createStoredScope) + * + * Scope may be stored by the Metrics framework via 'storedScope' concept for further reference. + * + * In either case, it is the caller's responsibility to end the scope via the Metrics framework (see Metrics#endScope) + */ +public interface MetricsScope { +} diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java index 3db26af..cba1c5a 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; @@ -72,6 +73,7 @@ */ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics { public static final String API_PREFIX = "api_"; + public static final String ACTIVE_CALLS = "active_calls_"; public static final Logger LOGGER = LoggerFactory.getLogger(CodahaleMetrics.class); public final MetricRegistry metricRegistry = new MetricRegistry(); @@ -86,15 +88,15 @@ private HiveConf conf; private final Set reporters = new HashSet(); - private final ThreadLocal> threadLocalScopes - = new ThreadLocal>() { + private final ThreadLocal> threadLocalScopes + = new ThreadLocal>() { @Override - protected HashMap initialValue() { - return new HashMap(); + protected HashMap initialValue() { + return new HashMap(); } }; - public static class MetricsScope { + public static class CodahaleMetricsScope implements MetricsScope { final String name; final Timer timer; @@ -108,7 +110,7 @@ * @param name - name of the variable * @throws IOException */ - private MetricsScope(String name, CodahaleMetrics metrics) throws IOException { + private CodahaleMetricsScope(String name, CodahaleMetrics metrics) throws IOException { this.name = name; this.metrics = metrics; this.timer = metrics.getTimer(name); @@ -124,6 +126,7 @@ public void open() throws IOException { if (!isOpen) { isOpen = true; this.timerContext = timer.time(); + metrics.incrementCounter(ACTIVE_CALLS + name); } else { throw new IOException("Scope named " + name + " is not closed, cannot be opened."); } @@ -136,7 +139,7 @@ public void open() throws IOException { public void close() throws IOException { if (isOpen) { timerContext.close(); - + metrics.decrementCounter(ACTIVE_CALLS + name); } else { throw new IOException("Scope named " + name + " is not open, cannot be closed."); } @@ -210,23 +213,41 @@ public void close() throws Exception { } @Override - public void startScope(String name) throws IOException { + public void startStoredScope(String name) throws IOException { name = API_PREFIX + name; if (threadLocalScopes.get().containsKey(name)) { threadLocalScopes.get().get(name).open(); } else { - threadLocalScopes.get().put(name, new MetricsScope(name, this)); + threadLocalScopes.get().put(name, new CodahaleMetricsScope(name, this)); } } @Override - public void endScope(String name) throws IOException { + public void endStoredScope(String name) throws IOException { name = API_PREFIX + name; if (threadLocalScopes.get().containsKey(name)) { threadLocalScopes.get().get(name).close(); + threadLocalScopes.get().remove(name); + } + } + + public MetricsScope getStoredScope(String name) throws IOException { + if (threadLocalScopes.get().containsKey(name)) { + return threadLocalScopes.get().get(name); + } else { + throw new IOException("No metrics scope named " + name); } } + public MetricsScope createScope(String name) throws IOException { + name = API_PREFIX + name; + return new CodahaleMetricsScope(name, this); + } + + public void endScope(MetricsScope scope) throws IOException { + ((CodahaleMetricsScope) scope).close(); + } + @Override public Long incrementCounter(String name) throws IOException { return incrementCounter(name, 1L); diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 548d7db..1ef636c 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.log; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -116,6 +119,7 @@ public void PerfLogBegin(String callerName, String method) { long startTime = System.currentTimeMillis(); LOG.info(""); startTimes.put(method, new Long(startTime)); + beginMetrics(method); } /** * Call this function in correspondence of PerfLogBegin to mark the end of the measurement. @@ -156,6 +160,8 @@ public long PerfLogEnd(String callerName, String method, String additionalInfo) sb.append(">"); LOG.info(sb.toString()); + endMetrics(method); + return duration; } @@ -193,4 +199,25 @@ public Long getDuration(String method) { return duration; } + private void beginMetrics(String method) { + Metrics metrics = MetricsFactory.getInstance(); + try { + if (metrics != null) { + metrics.startStoredScope(method); + } + } catch (IOException e) { + LOG.warn("Error recording metrics", e); + } + } + + private void endMetrics(String method) { + Metrics metrics = MetricsFactory.getInstance(); + try { + if (metrics != null) { + metrics.endStoredScope(method); + } + } catch (IOException e) { + LOG.warn("Error recording metrics", e); + } + } } diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java b/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java new file mode 100644 index 0000000..fd420f7 --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.metrics; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Utilities for codahale metrics verification. + */ +public class MetricsTestUtils { + + public static final MetricsCategory COUNTER = new MetricsCategory("counters", "count"); + public static final MetricsCategory TIMER = new MetricsCategory("timers", "count"); + public static final MetricsCategory GAUGE = new MetricsCategory("gauges", "value"); + + static class MetricsCategory { + String category; + String metricsHandle; + MetricsCategory(String category, String metricsHandle) { + this.category = category; + this.metricsHandle = metricsHandle; + } + } + + public static void verifyMetricFile(File jsonReportFile, MetricsCategory category, String metricsName, + Object value) throws Exception { + JsonNode jsonNode = getJsonNode(jsonReportFile, category, metricsName); + Assert.assertEquals(jsonNode.asText(), value.toString()); + } + + private static JsonNode getJsonNode(File jsonReportFile, MetricsCategory category, String metricsName) throws Exception { + byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(jsonData); + JsonNode categoryNode = rootNode.path(category.category); + JsonNode metricsNode = categoryNode.path(metricsName); + return metricsNode.path(category.metricsHandle); + } +} diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java index c3e8282..a3fb04f 100644 --- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java @@ -32,7 +32,7 @@ import javax.management.ObjectName; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.conf.HiveConf; import org.junit.After; import org.junit.Before; @@ -124,8 +124,8 @@ public void testMetricsMBean() throws Exception { @Test public void testScopeSingleThread() throws Exception { - metrics.startScope(scopeName); - final MetricsScope fooScope = metrics.getScope(scopeName); + metrics.startStoredScope(scopeName); + final LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName); // the time and number counters become available only after the 1st // scope close: expectIOE(new Callable() { @@ -151,15 +151,15 @@ public Void call() throws Exception { } }); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); Thread.sleep(periodMs+ 1); // 1st close: // closing of open scope should be ok: - metrics.endScope(scopeName); + metrics.endStoredScope(scopeName); expectIOE(new Callable() { @Override public Void call() throws Exception { - metrics.endScope(scopeName); // closing of closed scope not allowed + metrics.endStoredScope(scopeName); // closing of closed scope not allowed return null; } }); @@ -168,15 +168,15 @@ public Void call() throws Exception { final long t1 = fooScope.getTimeCounter().longValue(); assertTrue(t1 > periodMs); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); // opening allowed after closing: - metrics.startScope(scopeName); + metrics.startStoredScope(scopeName); // opening of already open scope not allowed: expectIOE(new Callable() { @Override public Void call() throws Exception { - metrics.startScope(scopeName); + metrics.startStoredScope(scopeName); return null; } }); @@ -184,7 +184,7 @@ public Void call() throws Exception { assertEquals(Long.valueOf(1), fooScope.getNumCounter()); assertEquals(t1, fooScope.getTimeCounter().longValue()); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); Thread.sleep(periodMs + 1); // Reopening (close + open) allowed in opened state: fooScope.reopen(); @@ -204,8 +204,8 @@ public Void call() throws Exception { @Test public void testScopeConcurrency() throws Exception { - metrics.startScope(scopeName); - MetricsScope fooScope = metrics.getScope(scopeName); + metrics.startStoredScope(scopeName); + LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName); final int threads = 10; ExecutorService executorService = Executors.newFixedThreadPool(threads); for (int i=0; i 3 * periodMs * threads); Double avgT = (Double) metrics.get("foo.avg_t"); assertTrue(avgT.doubleValue() > periodMs); - metrics.endScope(scopeName); + metrics.endStoredScope(scopeName); } void testScopeImpl(int n) throws Exception { - metrics.startScope(scopeName); - final MetricsScope fooScope = metrics.getScope(scopeName); + metrics.startStoredScope(scopeName); + final LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName); // cannot open scope that is already open: expectIOE(new Callable() { @Override @@ -241,10 +241,10 @@ public Void call() throws Exception { } }); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); Thread.sleep(periodMs+ 1); // 1st close: - metrics.endScope(scopeName); // closing of open scope should be ok. + metrics.endStoredScope(scopeName); // closing of open scope should be ok. assertTrue(fooScope.getNumCounter().longValue() >= 1); final long t1 = fooScope.getTimeCounter().longValue(); @@ -253,15 +253,15 @@ public Void call() throws Exception { expectIOE(new Callable() { @Override public Void call() throws Exception { - metrics.endScope(scopeName); // closing of closed scope not allowed + metrics.endStoredScope(scopeName); // closing of closed scope not allowed return null; } }); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); // opening allowed after closing: - metrics.startScope(scopeName); + metrics.startStoredScope(scopeName); assertTrue(fooScope.getNumCounter().longValue() >= 1); assertTrue(fooScope.getTimeCounter().longValue() >= t1); @@ -270,12 +270,12 @@ public Void call() throws Exception { expectIOE(new Callable() { @Override public Void call() throws Exception { - metrics.startScope(scopeName); + metrics.startStoredScope(scopeName); return null; } }); - assertSame(fooScope, metrics.getScope(scopeName)); + assertSame(fooScope, metrics.getStoredScope(scopeName)); Thread.sleep(periodMs + 1); // Reopening (close + open) allowed in opened state: fooScope.reopen(); diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java index a3aa549..27825b1 100644 --- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java @@ -77,8 +77,8 @@ public void after() throws Exception { public void testScope() throws Exception { int runs = 5; for (int i = 0; i < runs; i++) { - MetricsFactory.getInstance().startScope("method1"); - MetricsFactory.getInstance().endScope("method1"); + MetricsFactory.getInstance().startStoredScope("method1"); + MetricsFactory.getInstance().endStoredScope("method1"); } Timer timer = metricRegistry.getTimers().get("api_method1"); @@ -106,8 +106,8 @@ public void testConcurrency() throws Exception { executorService.submit(new Callable() { @Override public Void call() throws Exception { - MetricsFactory.getInstance().startScope("method2"); - MetricsFactory.getInstance().endScope("method2"); + MetricsFactory.getInstance().startStoredScope("method2"); + MetricsFactory.getInstance().endStoredScope("method2"); return null; } }); diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index c202001..1809cd8 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -146,6 +146,13 @@ ${project.version} tests + + org.apache.hive + hive-common + ${project.version} + test-jar + test + junit diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java new file mode 100644 index 0000000..8fc8841 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc.miniHS2; + +import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hive.service.cli.CLIServiceClient; +import org.apache.hive.service.cli.SessionHandle; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests HiveServer2 metrics. + */ +public class TestHs2Metrics { + + private static MiniHS2 miniHS2; + private static Map confOverlay; + private static File jsonReportFile; + private static Object monitor = new Object(); + + //Check metrics during semantic analysis. + public static class MetricCheckingHook implements HiveSemanticAnalyzerHook { + @Override + public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, + ASTNode ast) throws SemanticException { + try { + //Pre-analyze hook is fired in the middle of these calls + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_compile", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 1); + } catch (Exception e) { + throw new SemanticException("metrics verification failed", e); + } + return ast; + } + + @Override + public void postAnalyze(HiveSemanticAnalyzerHookContext context, + List> rootTasks) throws SemanticException { + } + } + + @BeforeClass + public static void setup() throws Exception { + miniHS2 = new MiniHS2(new HiveConf()); + confOverlay = new HashMap(); + confOverlay.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, MetricCheckingHook.class.getName()); + miniHS2.start(confOverlay); + + //for Metrics. MiniHS2 init code-path doesn't go through HiveServer2.startHiveServer2(). + File workDir = new File(System.getProperty("test.tmp.dir")); + jsonReportFile = new File(workDir, "json_reporting"); + jsonReportFile.delete(); + HiveConf conf = new HiveConf(); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); + MetricsFactory.init(conf); + } + + @Test + public void testMetrics() throws Exception { + String tableName = "testMetrics"; + CLIServiceClient serviceClient = miniHS2.getServiceClient(); + SessionHandle sessHandle = serviceClient.openSession("foo", "bar"); + + //Block on semantic analysis to check 'active_calls' + serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT)", confOverlay); + Thread.sleep(2000); + + //check that all calls were recorded. + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_INITIALIZED", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_PENDING", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_RUNNING", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "hs2_completed_operation_FINISHED", 1); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_Driver.run", 1); + + //but there should be no more active calls. + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 0); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_compile", 0); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 0); + + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 3c40d6e..a835f6a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -676,7 +676,7 @@ private String startFunction(String function, String extraLogInfo) { function + extraLogInfo); if (MetricsFactory.getInstance() != null) { try { - MetricsFactory.getInstance().startScope(function); + MetricsFactory.getInstance().startStoredScope(function); } catch (IOException e) { LOG.debug("Exception when starting metrics scope" + e.getClass().getName() + " " + e.getMessage(), e); @@ -720,7 +720,7 @@ private void endFunction(String function, boolean successful, Exception e, private void endFunction(String function, MetaStoreEndFunctionContext context) { if (MetricsFactory.getInstance() != null) { try { - MetricsFactory.getInstance().endScope(function); + MetricsFactory.getInstance().endStoredScope(function); } catch (IOException e) { LOG.debug("Exception when closing metrics scope" + e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 93c7a54..75e9027 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1548,7 +1548,6 @@ public int execute() throws CommandNeedRetryException { // Launch upto maxthreads tasks Task task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; diff --git a/service/pom.xml b/service/pom.xml index 22234d9..afa52cf 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -121,6 +121,13 @@ test tests + + org.apache.hive + hive-common + ${project.version} + test + test-jar + junit diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 4ca0561..ade913e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -19,13 +19,17 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.EnumSet; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,6 +49,7 @@ public abstract class Operation { protected final HiveSession parentSession; private OperationState state = OperationState.INITIALIZED; + private MetricsScope currentStateScope; private final OperationHandle opHandle; private HiveConf configuration; public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; @@ -70,6 +75,7 @@ protected Operation(HiveSession parentSession, OperationType opType, boolean run lastAccessTime = System.currentTimeMillis(); operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); + setMetrics(state); } public Future getBackgroundHandle() { @@ -128,6 +134,7 @@ public OperationLog getOperationLog() { protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; + setMetrics(state); this.lastAccessTime = System.currentTimeMillis(); return this.state; } @@ -330,4 +337,40 @@ protected HiveSQLException toSQLException(String prefix, CommandProcessorRespons } return ex; } + + //list of operation states to measure duration of. + protected static Set scopeStates = Sets.immutableEnumSet( + OperationState.INITIALIZED, + OperationState.PENDING, + OperationState.RUNNING + ); + + //list of termainal operation states. We measure only completed counts for operations in these states. + protected static Set terminalStates = Sets.immutableEnumSet( + OperationState.CLOSED, + OperationState.CANCELED, + OperationState.FINISHED, + OperationState.ERROR, + OperationState.UNKNOWN + ); + + protected void setMetrics(OperationState state) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + if (currentStateScope != null) { + metrics.endScope(currentStateScope); + currentStateScope = null; + } + if (scopeStates.contains(state)) { + currentStateScope = metrics.createScope(MetricsConstant.OPERATION_PREFIX + state.toString()); + } + if (terminalStates.contains(state)) { + metrics.incrementCounter(MetricsConstant.COMPLETED_OPERATION_PREFIX + state.toString()); + } + } catch (IOException e) { + LOG.warn("Error metrics", e); + } + } + } } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index a9b4334..d11cf3d 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -25,13 +25,19 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -108,8 +114,9 @@ private void createBackgroundOperationPool() { // Threads terminate when they are idle for more than the keepAliveTime // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize String threadPoolName = "HiveServer2-Background-Pool"; + final BlockingQueue queue = new LinkedBlockingQueue(poolQueueSize); backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(poolQueueSize), + keepAliveTime, TimeUnit.SECONDS, queue, new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); @@ -119,6 +126,22 @@ private void createBackgroundOperationPool() { hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); checkOperation = HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); + + Metrics m = MetricsFactory.getInstance(); + if (m != null) { + m.addGauge(MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, new MetricsVariable() { + @Override + public Object getValue() { + return queue.size(); + } + }); + m.addGauge(MetricsConstant.EXEC_ASYNC_POOL_SIZE, new MetricsVariable() { + @Override + public Object getValue() { + return backgroundOperationPool.getPoolSize(); + } + }); + } } private void initOperationLogRootDir() { diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java new file mode 100644 index 0000000..aaeecbe --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.server.HiveServer2; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; + +/** + * Test metrics from SessionManager. + */ +public class TestSessionManagerMetrics { + + private static SessionManager sm; + private static File jsonReportFile; + + @BeforeClass + public static void setup() throws Exception { + HiveConf conf = new HiveConf(); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 2); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE, 10); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, "1000000s"); + + File workDir = new File(System.getProperty("test.tmp.dir")); + jsonReportFile = new File(workDir, "json_reporting"); + jsonReportFile.delete(); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); + conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); + MetricsFactory.init(conf); + + HiveServer2 hs2 = new HiveServer2(); + sm = new SessionManager(hs2); + sm.init(conf); + } + + final Object barrier = new Object(); + + class BarrierRunnable implements Runnable { + @Override + public void run() { + synchronized (barrier) { + try { + barrier.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Tests metrics regarding async thread pool. + */ + @Test + public void testThreadPoolMetrics() throws Exception { + + sm.submitBackgroundOperation(new BarrierRunnable()); + sm.submitBackgroundOperation(new BarrierRunnable()); + sm.submitBackgroundOperation(new BarrierRunnable()); + sm.submitBackgroundOperation(new BarrierRunnable()); + + Thread.sleep(2000); + + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 2); + + synchronized (barrier) { + barrier.notifyAll(); + } + Thread.sleep(2000); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2); + MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 0); + } +}